Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

shutdown waits for all work to finish processing instead of just waiting for the queue to be drained before returning #55

Merged
merged 3 commits into from

3 participants

@joenoon

No description provided.

joenoon added some commits
@xaviershay
Collaborator

Read through the commits, looks pretty good to me. I'll let it sit here for a day or two in case others want to comment.

@mperham mperham merged commit 5f24164 into from
@mperham
Owner

Thanks Joe. Don't forget to update the changelog in the future.

@joenoon

Great, will do next time!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jul 15, 2012
  1. @joenoon

    shutdown waits for all work to finish processing instead of just wait…

    joenoon authored
    …ing for the
    
    queue to be drained before returning
  2. @joenoon

    handle shutting down state more gracefully. was previously waiting

    joenoon authored
    for the queue to drain, now lets work fully complete, but lets new work
    sit in the persister
  3. @joenoon
This page is out of date. Refresh to see the latest.
Showing with 50 additions and 10 deletions.
  1. +24 −10 lib/girl_friday/work_queue.rb
  2. +26 −0 test/test_girl_friday_queue.rb
View
34 lib/girl_friday/work_queue.rb
@@ -14,6 +14,7 @@ def initialize(name, options={}, &block)
@error_handlers = (Array(options[:error_handler] || ErrorHandler.default)).map(&:new)
@shutdown = false
+ @shutting_down = false
@busy_workers = []
@ready_workers = nil
@created_at = Time.now.to_i
@@ -76,6 +77,10 @@ def shutdown(&block)
@supervisor << Shutdown[block]
end
+ def working?
+ @busy_workers.size > 0
+ end
+
private
def running?
@@ -89,7 +94,7 @@ def handle_error(ex)
def on_ready(who)
@total_processed += 1
- if running? && work = @persister.pop
+ if !shutting_down? && running? && work = @persister.pop
who.this << work
drain
else
@@ -108,10 +113,13 @@ def shutdown_complete
end
end
+ def shutting_down?
+ !!@shutting_down
+ end
+
def on_work(work)
@total_queued += 1
-
- if running? && worker = ready_workers.pop
+ if !shutting_down? && running? && worker = ready_workers.pop
@busy_workers << worker
worker << work
drain
@@ -174,13 +182,19 @@ def supervisor_loop
on_work(work)
end
f.when(Shutdown) do |stop|
- @shutdown = true
- @when_shutdown = stop.callback
- @busy_workers.each { |w| w << stop }
- ready_workers.each { |w| w << stop }
- shutdown_complete
- GirlFriday.remove_queue @weakref
- return
+ @shutting_down = true
+ if !working?
+ @shutdown = true
+ @when_shutdown = stop.callback
+ @busy_workers.each { |w| w << stop }
+ ready_workers.each { |w| w << stop }
+ shutdown_complete
+ GirlFriday.remove_queue @weakref
+ return
+ else
+ Thread.pass
+ shutdown(&stop.callback)
+ end
end
f.when(Actor::DeadActorError) do |ex|
if running?
View
26 test/test_girl_friday_queue.rb
@@ -189,6 +189,32 @@ def test_should_allow_graceful_shutdown
end
end
+ def test_should_allow_in_progress_work_to_finish
+ mutex = Mutex.new
+ total = 8
+ count = 0
+ incr = Proc.new do
+ mutex.synchronize do
+ count += 1
+ end
+ end
+
+ async_test(10) do |cb|
+ queue = GirlFriday::WorkQueue.new('finish', :size => 10) do |msg|
+ sleep 1
+ incr.call
+ end
+ total.times do
+ queue.push(:text => 'foo')
+ end
+
+ GirlFriday.shutdown!
+ assert_equal total, queue.instance_variable_get("@total_processed")
+ assert_equal total, count
+ cb.call
+ end
+ end
+
def test_should_create_workers_lazily
async_test do |cb|
queue = GirlFriday::Queue.new('lazy', :size => 2) do |msg|
Something went wrong with that request. Please try again.