Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Implement clean shutdown, currently synchronous. Should I implement

async shutdown with callback support?
  • Loading branch information...
commit d805fdd10afd4ff5074ce08c023bd42928e06678 1 parent 3d32a27
@mperham authored
View
8 README.md
@@ -29,14 +29,14 @@ Put girl_friday in your Gemfile:
In your Rails app, create a `config/initializers/girl_friday.rb` which defines your queues:
- EMAIL_QUEUE = GirlFriday::WorkQueue.new('user_email', :size => 3) do |msg|
+ EMAIL_QUEUE = GirlFriday::WorkQueue.new(:user_email, :size => 3) do |msg|
UserMailer.registration_email(msg).deliver
end
- IMAGE_QUEUE = GirlFriday::WorkQueue.new('image_crawler', :size => 7) do |msg|
+ IMAGE_QUEUE = GirlFriday::WorkQueue.new(:image_crawler, :size => 7) do |msg|
ImageCrawler.process(msg)
end
-:size is the number of workers to spin up and defaults to 5. Keep in mind, ActiveRecord defaults to a connection pool size of 5 so if your workers are accessing the database, you'll want to insure that the connection pool is large enough by modifying `config/database.yml`.
+:size is the number of workers to spin up and defaults to 5. Keep in mind, ActiveRecord defaults to a connection pool size of 5 so if your workers are accessing the database you'll want to ensure that the connection pool is large enough by modifying `config/database.yml`.
In your controller action or model, you can call `#push(msg)`
@@ -50,7 +50,7 @@ Your message processing block should **not** access any instance data or variabl
More Detail
--------------------
-Please see the [girl_friday wiki](https://github.com/mperham/girl_friday/wiki) for more detail and advanced options and tuning.
+Please see the [girl_friday wiki](https://github.com/mperham/girl_friday/wiki) for more detail and advanced options and tuning. You'll find details on queue persistence with Redis, implementing clean shutdown, querying runtime metrics and SO MUCH MORE!
Thanks
View
1  TODO.md
@@ -1,6 +1,5 @@
TODO
===============
- - clean shutdown (drain queues)
- web admin UI to surface status() metrics
- nicer project homepage
View
37 lib/girl_friday.rb
@@ -1,3 +1,4 @@
+require 'thread'
begin
# Rubinius
require 'actor'
@@ -10,4 +11,38 @@
require 'girl_friday/version'
require 'girl_friday/work_queue'
require 'girl_friday/error_handler'
-require 'girl_friday/persistence'
+require 'girl_friday/persistence'
+
+module GirlFriday
+
+ def self.status
+ ObjectSpace.each_object(WorkQueue).inject({}) { |memo, queue| memo.merge(queue.status) }
+ end
+
+ ##
+ # Notify girl_friday to shutdown ASAP. Workers will not pick up any
+ # new work; any new work pushed onto the queues will be pushed onto the
+ # backlog (and persisted). This method will block until all queues are
+ # quiet or the timeout has passed.
+ def self.shutdown!(timeout=30)
+ queues = []
+ ObjectSpace.each_object(WorkQueue).each { |q| queues << q }
+ count = queues.size
+ m = Mutex.new
+ var = ConditionVariable.new
+
+ queues.each do |q|
+ q.shutdown do |queue|
+ m.synchronize do
+ count -= 1
+ var.signal if count == 0
+ end
+ end
+ end
+
+ m.synchronize do
+ var.wait(m, timeout)
+ end
+ end
+
+end
View
87 lib/girl_friday/work_queue.rb
@@ -1,12 +1,9 @@
module GirlFriday
- def self.status
- ObjectSpace.each_object(WorkQueue).inject({}) { |memo, queue| memo.merge(queue.status) }
- end
-
class WorkQueue
Ready = Struct.new(:this)
Work = Struct.new(:msg, :callback)
+ Shutdown = Struct.new(:callback)
attr_reader :name
def initialize(name, options={}, &block)
@@ -15,6 +12,7 @@ def initialize(name, options={}, &block)
@processor = block
@error_handler = (options[:error_handler] || ErrorHandler.default).new
+ @shutdown = false
@ready_workers = []
@busy_workers = []
@started_at = Time.now.to_i
@@ -24,8 +22,7 @@ def initialize(name, options={}, &block)
end
def push(work, &block)
- @total_queued += 1
- @actor << Work[work, block]
+ @supervisor << Work[work, block]
end
alias_method :<<, :push
@@ -45,10 +42,47 @@ def status
}
end
+ def shutdown
+ # Runtime state should never be modified by caller thread,
+ # only the Supervisor thread.
+ @supervisor << Shutdown[block_given? ? Proc.new : nil]
+ end
+
private
+ def on_ready(who)
+ @total_processed += 1
+ if !@shutdown && work = @persister.pop
+ who.this << work
+ drain(@ready_workers, @persister)
+ else
+ @busy_workers.delete(who.this)
+ @ready_workers << who.this
+ shutdown_complete if @shutdown && @busy_workers.size == 0
+ end
+ end
+
+ def shutdown_complete
+ begin
+ @when_shutdown.call(self) if @when_shutdown
+ rescue Exception => ex
+ @error_handler.handle(ex)
+ end
+ end
+
+ def on_work(work)
+ @total_queued += 1
+ if !@shutdown && worker = @ready_workers.pop
+ @busy_workers << worker
+ worker << work
+ drain(@ready_workers, @persister)
+ else
+ @persister << work
+ end
+ end
+
def start
- @actor = Actor.spawn do
+ @supervisor = Actor.spawn do
supervisor = Actor.current
work_loop = Proc.new do
loop do
@@ -69,27 +103,20 @@ def start
loop do
Actor.receive do |f|
f.when(Ready) do |who|
- @total_processed += 1
- if work = @persister.pop
- who.this << work
- drain(@ready_workers, @persister)
- else
- @busy_workers.delete(who.this)
- @ready_workers << who.this
- end
+ on_ready(who)
end
f.when(Work) do |work|
- if worker = @ready_workers.pop
- @busy_workers << worker
- worker << work
- drain(@ready_workers, @persister)
- else
- @persister << work
- end
+ on_work(work)
+ end
+ f.when(Shutdown) do |stop|
+ @shutdown = true
+ @when_shutdown = stop.callback
+ shutdown_complete if @shutdown && @busy_workers.size == 0
end
f.when(Actor::DeadActorError) do |exit|
# TODO Provide current message contents as error context
@total_errors += 1
+ @busy_workers.delete(exit.actor)
@ready_workers << Actor.spawn_link(&work_loop)
@error_handler.handle(exit.reason)
end
@@ -102,15 +129,15 @@ def start
$stderr.print("#{ex.backtrace.join("\n")}\n")
end
end
+ end
- def drain(ready, work)
- # give as much work to as many ready workers as possible
- todo = ready.size < work.size ? ready.size : work.size
- todo.times do
- worker = ready.pop
- @busy_workers << worker
- worker << work.pop
- end
+ def drain(ready, work)
+ # give as much work to as many ready workers as possible
+ todo = ready.size < work.size ? ready.size : work.size
+ todo.times do
+ worker = ready.pop
+ @busy_workers << worker
+ worker << work.pop
end
end
View
31 test/test_girl_friday.rb
@@ -46,7 +46,7 @@ def test_should_call_callback_when_complete
def test_should_provide_status
mutex = Mutex.new
- total = 100
+ total = 200
count = 0
incr = Proc.new do
mutex.synchronize do
@@ -108,4 +108,33 @@ def test_should_persist_with_redis
end
end
end
+
+ def test_should_allow_graceful_shutdown
+ mutex = Mutex.new
+ total = 100
+ count = 0
+ incr = Proc.new do
+ mutex.synchronize do
+ count += 1
+ end
+ end
+
+ async_test do |cb|
+ queue = GirlFriday::WorkQueue.new('shutdown', :size => 2) do |msg|
+ incr.call
+ cb.call if count == total
+ end
+ total.times do
+ queue.push(:text => 'foo')
+ end
+
+ GirlFriday.shutdown!
+ s = queue.status
+ assert_equal 0, s['shutdown'][:busy]
+ assert_equal 2, s['shutdown'][:ready]
+ assert(s['shutdown'][:backlog] > 0)
+ cb.call
+ end
+ end
+
end
Please sign in to comment.
Something went wrong with that request. Please try again.