Skip to content

Commit

Permalink
Merge branch 'v2.1-reserve' of https://github.com/Viximo/delayed_job
Browse files Browse the repository at this point in the history
* 'v2.1-reserve' of https://github.com/Viximo/delayed_job:
  Fix use of limit for Rails 3
  When using the activerecord backend, reserve jobs with an update query instead of trying to lock a batch of available jobs.  This causes workers to serialize on the database and avoids contention between them.
  • Loading branch information
bkeepers committed Dec 1, 2010
2 parents 48d1281 + 6b0e480 commit 1ad4453
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
32 changes: 27 additions & 5 deletions lib/delayed/backend/active_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ class Job < ::ActiveRecord::Base
}
scope :by_priority, order('priority ASC, run_at ASC')

scope :locked_by_worker, lambda{|worker_name, max_run_time|
where(['locked_by = ? AND locked_at > ?', worker_name, db_time_now - max_run_time])
}

def self.before_fork
::ActiveRecord::Base.clear_all_connections!
end
Expand All @@ -28,15 +32,33 @@ def self.after_fork
def self.clear_locks!(worker_name)
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

# Find a few candidate jobs to run (in case some immediately get locked by others).
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)

def self.jobs_available_to_worker(worker_name, max_run_time)
scope = self.ready_to_run(worker_name, max_run_time)
scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority

scope.by_priority
end

# Reserve a single job in a single update query. This causes workers to serialize on the
# database and avoids contention.
def self.reserve(worker, max_run_time = Worker.max_run_time)
affected_rows = 0
::ActiveRecord::Base.silence do
affected_rows = jobs_available_to_worker(worker.name, max_run_time).limit(1).update_all(["locked_at = ?, locked_by = ?", db_time_now, worker.name])
end

if affected_rows == 1
locked_by_worker(worker.name, max_run_time).first
else
nil
end
end

# Find a few candidate jobs to run (in case some immediately get locked by others).
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
::ActiveRecord::Base.silence do
scope.by_priority.all(:limit => limit)
jobs_available_to_worker(worker_name, max_run_time).limit(limit).all
end
end

Expand Down
1 change: 1 addition & 0 deletions lib/generators/delayed_job/templates/migration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def self.up
end

add_index :delayed_jobs, [:priority, :run_at], :name => 'delayed_jobs_priority'
add_index :delayed_jobs, :locked_by, :name => 'delayed_jobs_locked_by'
end

def self.down
Expand Down

0 comments on commit 1ad4453

Please sign in to comment.