Skip to content

Commit

Permalink
Revert "When using the activerecord backend, reserve jobs with an upd…
Browse files Browse the repository at this point in the history
…ate query instead of trying to lock a batch of available jobs. This causes workers to serialize on the database and avoids contention between them."

This reverts commit 5c57091.
  • Loading branch information
betamatt committed Dec 2, 2010
1 parent 075fc7e commit edb5bc3
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 71 deletions.
1 change: 0 additions & 1 deletion generators/delayed_job/templates/migration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ def self.up
end

add_index :delayed_jobs, [:priority, :run_at], :name => 'delayed_jobs_priority'
add_index :delayed_jobs, :locked_by, :name => 'delayed_jobs_locked_by'
end

def self.down
Expand Down
32 changes: 5 additions & 27 deletions lib/delayed/backend/active_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ class Job < ::ActiveRecord::Base
}
named_scope :by_priority, :order => 'priority ASC, run_at ASC'

named_scope :locked_by_worker, lambda{|worker_name, max_run_time|
{:conditions => ['locked_by = ? AND locked_at > ?', worker_name, db_time_now - max_run_time]}
}

def self.after_fork
::ActiveRecord::Base.connection.reconnect!
end
Expand All @@ -42,33 +38,15 @@ def self.after_fork
def self.clear_locks!(worker_name)
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

def self.jobs_available_to_worker(worker_name, max_run_time)

# Find a few candidate jobs to run (in case some immediately get locked by others).
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
scope = self.ready_to_run(worker_name, max_run_time)
scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
scope.by_priority
end

# Reserve a single job in a single update query. This causes workers to serialize on the
# database and avoids contention.
def self.reserve(worker, max_run_time = Worker.max_run_time)
affected_rows = 0
::ActiveRecord::Base.silence do
affected_rows = update_all(["locked_at = ?, locked_by = ?", db_time_now, worker.name], jobs_available_to_worker(worker.name, max_run_time).scope(:find)[:conditions], :limit => 1)
end

if affected_rows == 1
locked_by_worker(worker.name, max_run_time).first
else
nil
end
end

# Find a few candidate jobs to run (in case some immediately get locked by others).
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)

::ActiveRecord::Base.silence do
jobs_available_to_worker(worker_name, max_run_time).all(:limit => limit)
scope.by_priority.all(:limit => limit)
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/delayed/backend/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ def enqueue(*args)
def reserve(worker, max_run_time = Worker.max_run_time)
# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(worker.name, 5, max_run_time).detect do |job|
job.lock_exclusively!(max_run_time, worker.name)
find_available(worker, 5, max_run_time).detect do |job|
job.lock_exclusively!(max_run_time, worker)
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def handle_failed_job(job, error)
# Run the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def reserve_and_run_one_job
job = Delayed::Job.reserve(self)
job = Delayed::Job.reserve(worker, self.class.max_run_time)
run(job) if job
end
end
Expand Down
40 changes: 0 additions & 40 deletions spec/backend/shared_backend_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -173,46 +173,6 @@ def create_job(opts = {})
end
end

describe "reserve" do
before do
Delayed::Worker.max_run_time = 2.minutes
@worker = Delayed::Worker.new(:quiet => true)
end

it "should not reserve failed jobs" do
create_job :attempts => 50, :failed_at => described_class.db_time_now
described_class.reserve(@worker).should be_nil
end

it "should not reserve jobs scheduled for the future" do
create_job :run_at => (described_class.db_time_now + 1.minute)
described_class.reserve(@worker).should be_nil
end

it "should lock the job so other workers can't reserve it" do
job = create_job
described_class.reserve(@worker).should == job
new_worker = Delayed::Worker.new(:quiet => true)
new_worker.name = 'worker2'
described_class.reserve(new_worker).should be_nil
end

it "should reserve open jobs" do
job = create_job
described_class.reserve(@worker).should == job
end

it "should reserve expired jobs" do
job = create_job(:locked_by => @worker.name, :locked_at => described_class.db_time_now - 3.minutes)
described_class.reserve(@worker).should == job
end

it "should reserve own jobs" do
job = create_job(:locked_by => @worker.name, :locked_at => (described_class.db_time_now - 1.minutes))
described_class.reserve(@worker).should == job
end
end

context "#name" do
it "should be the class name of the job that was enqueued" do
@backend.create(:payload_object => ErrorJob.new ).name.should == 'ErrorJob'
Expand Down

0 comments on commit edb5bc3

Please sign in to comment.