Skip to content

Commit

Permalink
Revert "Merge branch 'v2.1-reserve' of https://github.com/Viximo/dela…
Browse files Browse the repository at this point in the history
…yed_job"

This reverts commit 1ad4453, reversing
changes made to 48d1281.
  • Loading branch information
betamatt committed Dec 2, 2010
1 parent d3acec0 commit ddc0d20
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 28 deletions.
32 changes: 5 additions & 27 deletions lib/delayed/backend/active_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ class Job < ::ActiveRecord::Base
}
scope :by_priority, order('priority ASC, run_at ASC')

scope :locked_by_worker, lambda{|worker_name, max_run_time|
where(['locked_by = ? AND locked_at > ?', worker_name, db_time_now - max_run_time])
}

def self.before_fork
::ActiveRecord::Base.clear_all_connections!
end
Expand All @@ -32,33 +28,15 @@ def self.after_fork
def self.clear_locks!(worker_name)
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

def self.jobs_available_to_worker(worker_name, max_run_time)

# Find a few candidate jobs to run (in case some immediately get locked by others).
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
scope = self.ready_to_run(worker_name, max_run_time)
scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
scope.by_priority
end

# Reserve a single job in a single update query. This causes workers to serialize on the
# database and avoids contention.
def self.reserve(worker, max_run_time = Worker.max_run_time)
affected_rows = 0
::ActiveRecord::Base.silence do
affected_rows = jobs_available_to_worker(worker.name, max_run_time).limit(1).update_all(["locked_at = ?, locked_by = ?", db_time_now, worker.name])
end

if affected_rows == 1
locked_by_worker(worker.name, max_run_time).first
else
nil
end
end

# Find a few candidate jobs to run (in case some immediately get locked by others).
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)

::ActiveRecord::Base.silence do
jobs_available_to_worker(worker_name, max_run_time).limit(limit).all
scope.by_priority.all(:limit => limit)
end
end

Expand Down
1 change: 0 additions & 1 deletion lib/generators/delayed_job/templates/migration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ def self.up
end

add_index :delayed_jobs, [:priority, :run_at], :name => 'delayed_jobs_priority'
add_index :delayed_jobs, :locked_by, :name => 'delayed_jobs_locked_by'
end

def self.down
Expand Down

0 comments on commit ddc0d20

Please sign in to comment.