Skip to content

Commit

Permalink
Merge pull request collectiveidea#342 from dcuddeback/master
Browse files Browse the repository at this point in the history
Makes the number of jobs read from the queue a configuration option
  • Loading branch information
bryckbost committed Jan 27, 2012
2 parents 91523ab + b6c1496 commit 37b1a4a
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 2 deletions.
3 changes: 3 additions & 0 deletions README.textile
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ Delayed::Worker.destroy_failed_jobs = false. The failed jobs will be marked with

By default all jobs are scheduled with priority = 0, which is top priority. You can change this by setting Delayed::Worker.default_priority to something else. Lower numbers have higher priority.

The default behavior is to read 5 jobs from the queue when finding an available job. You can configure this by setting Delayed::Worker.read_ahead.

It is possible to disable delayed jobs for testing purposes. Set Delayed::Worker.delay_jobs = false to execute all jobs realtime.

Here is an example of changing job parameters in Rails:
Expand All @@ -262,6 +264,7 @@ Delayed::Worker.destroy_failed_jobs = false
Delayed::Worker.sleep_delay = 60
Delayed::Worker.max_attempts = 3
Delayed::Worker.max_run_time = 5.minutes
Delayed::Worker.read_ahead = 10
Delayed::Worker.delay_jobs = !Rails.env.test?
</pre>

Expand Down
2 changes: 1 addition & 1 deletion lib/delayed/backend/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def enqueue(*args)
def reserve(worker, max_run_time = Worker.max_run_time)
# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(worker.name, 5, max_run_time).detect do |job|
find_available(worker.name, worker.read_ahead, max_run_time).detect do |job|
job.lock_exclusively!(max_run_time, worker.name)
end
end
Expand Down
21 changes: 21 additions & 0 deletions lib/delayed/backend/shared_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,27 @@ def create_job(opts = {})
end
end

context "worker read-ahead" do
before do
@read_ahead = Delayed::Worker.read_ahead
end

after do
Delayed::Worker.read_ahead = @read_ahead
end

it "should read five jobs" do
described_class.should_receive(:find_available).with(anything, 5, anything).and_return([])
described_class.reserve(worker)
end

it "should read a configurable number of jobs" do
Delayed::Worker.read_ahead = 15
described_class.should_receive(:find_available).with(anything, Delayed::Worker.read_ahead, anything).and_return([])
described_class.reserve(worker)
end
end

context "clear_locks!" do
before do
@job = create_job(:locked_by => 'worker1', :locked_at => described_class.db_time_now)
Expand Down
3 changes: 3 additions & 0 deletions lib/delayed/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ def initialize(args)
opts.on('--sleep-delay N', "Amount of time to sleep when no jobs are found") do |n|
@options[:sleep_delay] = n
end
opts.on('--read-ahead N', "Number of jobs from the queue to consider") do |n|
@options[:read_ahead] = n
end
opts.on('-p', '--prefix NAME', "String to be prefixed to worker process names") do |prefix|
@options[:prefix] = prefix
end
Expand Down
4 changes: 3 additions & 1 deletion lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@

module Delayed
class Worker
cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time, :default_priority, :sleep_delay, :logger, :delay_jobs, :queues
cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time, :default_priority, :sleep_delay, :logger, :delay_jobs, :queues, :read_ahead
self.sleep_delay = 5
self.max_attempts = 25
self.max_run_time = 4.hours
self.default_priority = 0
self.delay_jobs = true
self.queues = []
self.read_ahead = 5

# Add or remove plugins in this list before the worker is instantiated
cattr_accessor :plugins
Expand Down Expand Up @@ -82,6 +83,7 @@ def initialize(options={})
self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay)
self.class.read_ahead = options[:read_ahead] if options.has_key?(:read_ahead)
self.class.queues = options[:queues] if options.has_key?(:queues)

self.plugins.each { |klass| klass.new }
Expand Down

0 comments on commit 37b1a4a

Please sign in to comment.