Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

active record changes and fixing some small bugs.

  • Loading branch information...
commit 4b9b0799622befa5c72e8650a26c2b7c3bde76ee 1 parent 0be1cfa
@rares rares authored
Showing with 33 additions and 20 deletions.
  1. +17 −20 lib/delayed/job.rb
  2. +16 −0 spec/job_spec.rb
View
37 lib/delayed/job.rb
@@ -13,7 +13,9 @@ class Job < ActiveRecord::Base
self.min_priority = nil
self.max_priority = nil
- NextTaskSQL = '(`locked_by` = ?) OR (`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?))'
+ # Conditions to find tasks that are locked by this process or one that has
+ # been created before now and is not currently locked.
+ NextTaskSQL = '(locked_by = ?) OR (run_at <= ? AND (locked_at IS NULL OR locked_at < ?))'
NextTaskOrder = 'priority DESC, run_at ASC'
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
@@ -21,7 +23,7 @@ class LockError < StandardError
end
def self.clear_locks!
- connection.execute "UPDATE #{table_name} SET `locked_by`=NULL, `locked_at`=NULL WHERE `locked_by`=#{quote_value(worker_name)}"
+ update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end
def payload_object
@@ -65,23 +67,24 @@ def self.find_available(limit = 5)
time_now = db_time_now
sql = NextTaskSQL.dup
- conditions = [time_now, time_now, worker_name]
+ conditions = [worker_name, time_now, time_now]
if self.min_priority
- sql << ' AND (`priority` >= ?)'
+ sql << ' AND (priority >= ?)'
conditions << min_priority
end
if self.max_priority
- sql << ' AND (`priority` <= ?)'
+ sql << ' AND (priority <= ?)'
conditions << max_priority
end
conditions.unshift(sql)
-
+
ActiveRecord::Base.silence do
find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
end
+
end
@@ -89,7 +92,7 @@ def self.find_available(limit = 5)
# If no jobs are left we return nil
def self.reserve(max_run_time = 4.hours)
- # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
+ # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(5).each do |job|
begin
@@ -124,22 +127,16 @@ def lock_exclusively!(max_run_time, worker = worker_name)
affected_rows = if locked_by != worker
# We don't own this job so we will update the locked_by name and the locked_at
- connection.update(<<-end_sql, "#{self.class.name} Update to aquire exclusive lock")
- UPDATE #{self.class.table_name}
- SET `locked_at`=#{quote_value(now)}, `locked_by`=#{quote_value(worker)}
- WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_at` IS NULL OR `locked_at` < #{quote_value(now - max_run_time.to_i)})
- end_sql
-
+ transaction do
+ Job.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
+ end
else
# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
- connection.update(<<-end_sql, "#{self.class.name} Update exclusive lock")
- UPDATE #{self.class.table_name}
- SET `locked_at`=#{quote_value(now)}
- WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_by`=#{quote_value(worker)})
- end_sql
-
+ transaction do
+ Job.update_all(["locked_at = ?", now], ["id = ? and (locked_by = ?)", id, worker])
+ end
end
unless affected_rows == 1
@@ -149,7 +146,7 @@ def lock_exclusively!(max_run_time, worker = worker_name)
self.locked_at = now
self.locked_by = worker
end
-
+
def unlock
self.locked_at = nil
self.locked_by = nil
View
16 spec/job_spec.rb
@@ -173,4 +173,20 @@ def perform; raise 'did not work'; end
end
+ context "when retreiving jobs" do
+ before(:each) do
+ @simple_job = SimpleJob.new
+ @job = Delayed::Job.create :payload_object => @simple_job, :locked_by => 'worker1', :locked_at => Delayed::Job.db_time_now - 5.minutes
+ end
+
+ it "should return jobs that haven't been processed yet" do
+ SimpleJob.runs.should == 0
+ # Delayed::Job.should_receive(:find_available).once.with(5).and_return([@job])
+ Delayed::Job.should_receive(:reserve).once.and_yield(@job.payload_object)
+ Delayed::Job.work_off(1)
+ SimpleJob.runs.should == 1
+ end
+
+ end
+
end
Please sign in to comment.
Something went wrong with that request. Please try again.