diff --git a/lib/delayed/backend/base.rb b/lib/delayed/backend/base.rb index c50465cc3..4a330d623 100644 --- a/lib/delayed/backend/base.rb +++ b/lib/delayed/backend/base.rb @@ -41,7 +41,7 @@ def enqueue(*args) def reserve(worker, max_run_time = Worker.max_run_time) # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next. # this leads to a more even distribution of jobs across the worker processes - find_available(worker.name, 5, max_run_time).detect do |job| + find_available(worker.name, worker.read_ahead, max_run_time).detect do |job| job.lock_exclusively!(max_run_time, worker.name) end end diff --git a/lib/delayed/backend/shared_spec.rb b/lib/delayed/backend/shared_spec.rb index c83b34cff..250e72adc 100644 --- a/lib/delayed/backend/shared_spec.rb +++ b/lib/delayed/backend/shared_spec.rb @@ -279,6 +279,27 @@ def create_job(opts = {}) end end + context "worker read-ahead" do + before do + @read_ahead = Delayed::Worker.read_ahead + end + + after do + Delayed::Worker.read_ahead = @read_ahead + end + + it "should read five jobs" do + described_class.should_receive(:find_available).with(anything, 5, anything).and_return([]) + described_class.reserve(worker) + end + + it "should read a configurable number of jobs" do + Delayed::Worker.read_ahead = 15 + described_class.should_receive(:find_available).with(anything, Delayed::Worker.read_ahead, anything).and_return([]) + described_class.reserve(worker) + end + end + context "clear_locks!" do before do @job = create_job(:locked_by => 'worker1', :locked_at => described_class.db_time_now) diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index 8f7d397cf..f65cd0a44 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -7,13 +7,14 @@ module Delayed class Worker - cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time, :default_priority, :sleep_delay, :logger, :delay_jobs, :queues + cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time, :default_priority, :sleep_delay, :logger, :delay_jobs, :queues, :read_ahead self.sleep_delay = 5 self.max_attempts = 25 self.max_run_time = 4.hours self.default_priority = 0 self.delay_jobs = true self.queues = [] + self.read_ahead = 5 # Add or remove plugins in this list before the worker is instantiated cattr_accessor :plugins @@ -82,6 +83,7 @@ def initialize(options={}) self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority) self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority) self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay) + self.class.read_ahead = options[:read_ahead] if options.has_key?(:read_ahead) self.class.queues = options[:queues] if options.has_key?(:queues) self.plugins.each { |klass| klass.new }