Skip to content

Commit

Permalink
Merge branch 'exit-when-done'
Browse files Browse the repository at this point in the history
Conflicts:
	README.md
  • Loading branch information
albus522 committed Jan 28, 2013
2 parents 5b5f971 + 475b081 commit bc25265
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 18 deletions.
7 changes: 7 additions & 0 deletions README.md
Expand Up @@ -172,13 +172,20 @@ You can then do the following:
RAILS_ENV=production script/delayed_job --queue=tracking start
RAILS_ENV=production script/delayed_job --queues=mailers,tasks start

# Runs all available jobs and the exits
RAILS_ENV=production script/delayed_job start --exit-on-complete
# or to run in the foreground
RAILS_ENV=production script/delayed_job run --exit-on-complete

Workers can be running on any computer, as long as they have access to the
database and their clock is in sync. Keep in mind that each worker will check
the database at least every 5 seconds.

You can also invoke `rake jobs:work` which will start working off jobs. You can
cancel the rake task with `CTRL-C`.

If you want to just run all available jobs and exit you can use `rake jobs:workoff`

Work off queues by setting the `QUEUE` or `QUEUES` environment variable.

QUEUE=tracking rake jobs:work
Expand Down
3 changes: 3 additions & 0 deletions lib/delayed/command.rb
Expand Up @@ -61,6 +61,9 @@ def initialize(args)
opts.on('--queue=queue', "Specify which queue DJ must look up for jobs") do |queue|
@options[:queues] = queue.split(',')
end
opts.on('--exit-on-complete', "Exit when no more jobs are available to run. This will exit if all jobs are scheduled to run in the future.") do
@options[:exit_on_complete] = true
end
end
@args = opts.parse!(args)
end
Expand Down
18 changes: 16 additions & 2 deletions lib/delayed/tasks.rb
Expand Up @@ -5,7 +5,21 @@
end

desc "Start a delayed_job worker."
task :work => :environment do
Delayed::Worker.new(:min_priority => ENV['MIN_PRIORITY'], :max_priority => ENV['MAX_PRIORITY'], :queues => (ENV['QUEUES'] || ENV['QUEUE'] || '').split(','), :quiet => false).start
task :work => :environment_options do
Delayed::Worker.new(@worker_options).start
end

desc "Start a delayed_job worker and exit when all available jobs are complete."
task :workoff => :environment_options do
Delayed::Worker.new(@worker_options.merge({:exit_on_complete => true})).start
end

task :environment_options => :environment do
@worker_options = {
:min_priority => ENV['MIN_PRIORITY'],
:max_priority => ENV['MAX_PRIORITY'],
:queues => (ENV['QUEUES'] || ENV['QUEUE'] || '').split(','),
:quiet => false
}
end
end
32 changes: 16 additions & 16 deletions lib/delayed/worker.rb
Expand Up @@ -18,7 +18,7 @@ class Worker

cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time,
:default_priority, :sleep_delay, :logger, :delay_jobs, :queues,
:read_ahead, :plugins, :destroy_failed_jobs
:read_ahead, :plugins, :destroy_failed_jobs, :exit_on_complete

# Named queue into which jobs are enqueued by default
cattr_accessor :default_queue_name
Expand Down Expand Up @@ -97,11 +97,10 @@ def self.lifecycle

def initialize(options={})
@quiet = options.has_key?(:quiet) ? options[:quiet] : true
self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay)
self.class.read_ahead = options[:read_ahead] if options.has_key?(:read_ahead)
self.class.queues = options[:queues] if options.has_key?(:queues)

[:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues, :exit_on_complete].each do |option|
self.class.send("#{option}=", options[option]) if options.has_key?(option)
end

self.plugins.each { |klass| klass.new }
end
Expand Down Expand Up @@ -130,21 +129,22 @@ def start
self.class.lifecycle.run_callbacks(:execute, self) do
loop do
self.class.lifecycle.run_callbacks(:loop, self) do
result = nil

realtime = Benchmark.realtime do
result = work_off
@realtime = Benchmark.realtime do
@result = work_off
end
end

count = result.sum

break if stop?
count = @result.sum

if count.zero?
sleep(self.class.sleep_delay)
if count.zero?
if self.class.exit_on_complete
say "No more jobs available. Exiting"
break
else
say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
sleep(self.class.sleep_delay) unless stop?
end
else
say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / @realtime, @result.last]
end

break if stop?
Expand Down
17 changes: 17 additions & 0 deletions spec/worker_spec.rb
Expand Up @@ -41,4 +41,21 @@
Delayed::Job.reserve(Delayed::Worker.new)
end
end

context "worker exit on complete" do
before do
Delayed::Worker.exit_on_complete = true
end

after do
Delayed::Worker.exit_on_complete = false
end

it "exits the loop when no jobs are available" do
worker = Delayed::Worker.new
Timeout::timeout(2) do
worker.start
end
end
end
end

0 comments on commit bc25265

Please sign in to comment.