Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

first shot at crappy pre forking

  • Loading branch information...
commit 7ed4ca06fc4874834a6da7808a8df2b236dc038f 1 parent 4499a39
@mrb authored
Showing with 59 additions and 7 deletions.
  1. +10 −7 lib/resque/tasks.rb
  2. +49 −0 lib/resque/worker.rb
View
17 lib/resque/tasks.rb
@@ -11,9 +11,12 @@
queues = (ENV['QUEUES'] || ENV['QUEUE']).to_s.split(',')
begin
- worker = Resque::Worker.new(*queues)
- worker.verbose = ENV['LOGGING'] || ENV['VERBOSE']
- worker.very_verbose = ENV['VVERBOSE']
+ #worker = Resque::Worker.new(*queues)
+ supervisor = Resque::Supervisor.new
+ supervisor.spawn_workers
+ supervisor.watch_workers
+ #worker.verbose = ENV['LOGGING'] || ENV['VERBOSE']
+ #worker.very_verbose = ENV['VVERBOSE']
rescue Resque::NoQueueError
abort "set QUEUE env var, e.g. $ QUEUE=critical,high rake resque:work"
end
@@ -25,11 +28,11 @@
Process.daemon(true)
end
- if ENV['PIDFILE']
- File.open(ENV['PIDFILE'], 'w') { |f| f << worker.pid }
- end
+ #if ENV['PIDFILE']
+ # File.open(ENV['PIDFILE'], 'w') { |f| f << worker.pid }
+ #end
- worker.log "Starting worker #{worker}"
+ #worker.log "Starting worker #{worker}"
worker.work(ENV['INTERVAL'] || 5) # interval, will block
end
View
49 lib/resque/worker.rb
@@ -1,4 +1,52 @@
module Resque
+ class Supervisor
+ PRE_FORKS = 3
+
+ def initialize
+ $0 = "Resque Supervisor"
+ @workers = []
+ end
+
+ def spawn_worker
+ worker = Resque::Worker.new
+ @workers << worker
+ worker.work(ENV['INTERVAL'] || 5)
+ end
+
+ def spawn_workers
+ while @workers.length < (PRE_FORKS+1)
+ spawn_worker
+ end
+ end
+
+ def watch_workers
+ loop{
+ Signal.trap('CLD') {
+ begin
+ @workers.delete(find_worker_by_pid(Process.wait))
+ rescue Errno::ECHILD
+ end
+ spawn_worker
+ }
+ logger.info "Supervisor #{Process.pid}: #{@workers.length} Workers"
+ spawn_workers if @workers.length < PRE_FORKS
+ sleep 1
+ }
+ end
+
+ def find_worker_by_pid(pid)
+ @workers.select{|w| w.child == pid}[0]
+ end
+
+ def logger
+ @logger ||= Logger.new(STDOUT)
+ end
+
+ def to_s
+ "#<Supervisor:#{self.object_id} @workers=#{@workers}>"
+ end
+ end
+
# A Resque Worker processes jobs. On platforms that support fork(2),
# the worker will fork off a child to process each job. This ensures
# a clean slate when beginning the next job and cuts down on gradual
@@ -134,6 +182,7 @@ def work(interval = 5.0, &block)
if @child = fork
srand # Reseeding
procline "Forked #{@child} at #{Time.now.to_i}"
+ $0 = "Resque Worker"
Process.wait
else
procline "Processing #{job.queue} since #{Time.now.to_i}"
Please sign in to comment.
Something went wrong with that request. Please try again.