Permalink
Browse files

Merge branch 'master' into jbarnette/master

Conflicts:

	tasks/jobs.rake
  • Loading branch information...
2 parents 450908d + 249c5a9 commit b9ebbb4ba0679bc0b003ab4e85dd2731325d93e2 @tobi committed Nov 1, 2008
Showing with 334 additions and 279 deletions.
  1. +4 −4 README.textile
  2. +94 −77 lib/delayed/job.rb
  3. +2 −2 lib/delayed/message_sending.rb
  4. +20 −13 lib/delayed/performable_method.rb
  5. +13 −8 lib/delayed/worker.rb
  6. +28 −26 spec/database.rb
  7. +60 −62 spec/delayed_method_spec.rb
  8. +101 −74 spec/job_spec.rb
  9. +11 −12 spec/story_spec.rb
  10. +1 −1 tasks/jobs.rake
View
@@ -27,10 +27,10 @@ The library evolves around a delayed_jobs table which looks as follows:
table.integer :attempts, :default => 0
table.text :handler
table.string :last_error
- table.datetime :run_at
- table.datetime :locked_at
- table.string :locked_by
- table.timestamps
+ table.datetime :run_at
+ table.datetime :locked_at
+ table.string :locked_by
+ table.timestamps
end
h2. Usage
View
@@ -2,27 +2,28 @@
module Delayed
class DeserializationError < StandardError
- end
+ end
- class Job < ActiveRecord::Base
- MAX_ATTEMPTS = 25
+ class Job < ActiveRecord::Base
+ MAX_ATTEMPTS = 25
set_table_name :delayed_jobs
- cattr_accessor :worker_name
- self.worker_name = "pid:#{Process.pid}"
-
-
- NextTaskSQL = '`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?) OR (`locked_by` = ?)'
+ cattr_accessor :worker_name, :min_priority, :max_priority
+ self.worker_name = "pid:#{Process.pid}"
+ self.min_priority = nil
+ self.max_priority = nil
+
+ NextTaskSQL = '(`locked_by` = ?) OR (`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?))'
NextTaskOrder = 'priority DESC, run_at ASC'
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
class LockError < StandardError
- end
+ end
def self.clear_locks!
connection.execute "UPDATE #{table_name} SET `locked_by`=NULL, `locked_at`=NULL WHERE `locked_by`=#{quote_value(worker_name)}"
end
-
+
def payload_object
@payload_object ||= deserialize(self['handler'])
end
@@ -31,43 +32,59 @@ def name
text = handler.gsub(/\n/, ' ')
"#{id} (#{text.length > 40 ? "#{text[0..40]}..." : text})"
end
-
+
def payload_object=(object)
self['handler'] = object.to_yaml
end
-
- def reschedule(message, time = nil)
-
+
+ def reschedule(message, time = nil)
if self.attempts < MAX_ATTEMPTS
time ||= Job.db_time_now + (attempts ** 4) + 5
self.attempts += 1
self.run_at = time
self.last_error = message
- self.unlock
+ self.unlock
save!
else
logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
destroy
end
end
-
-
+
def self.enqueue(object, priority = 0)
unless object.respond_to?(:perform)
- raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
+ raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end
-
- Job.create(:payload_object => object, :priority => priority)
- end
-
+
+ Job.create(:payload_object => object, :priority => priority.to_i)
+ end
+
def self.find_available(limit = 5)
- time_now = db_time_now
- ActiveRecord::Base.silence do
- find(:all, :conditions => [NextTaskSQL, time_now, time_now, worker_name], :order => NextTaskOrder, :limit => limit)
+
+ time_now = db_time_now
+
+ sql = NextTaskSQL.dup
+ conditions = [time_now, time_now, worker_name]
+
+ if self.min_priority
+ sql << ' AND (`priority` >= ?)'
+ conditions << min_priority
end
- end
-
+
+ if self.max_priority
+ sql << ' AND (`priority` <= ?)'
+ conditions << max_priority
+ end
+
+ conditions.unshift(sql)
+
+ ActiveRecord::Base.silence do
+ find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
+ end
+ end
+
+
# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(max_run_time = 4.hours)
@@ -92,30 +109,30 @@ def self.reserve(max_run_time = 4.hours)
job.reschedule e.message
logger.error "* [JOB] #{job.name} failed with #{e.class.name}: #{e.message} - #{job.attempts} failed attempts"
logger.error(e)
- return job
+ return job
end
end
nil
- end
+ end
# This method is used internally by reserve method to ensure exclusive access
# to the given job. It will rise a LockError if it cannot get this lock.
def lock_exclusively!(max_run_time, worker = worker_name)
- now = self.class.db_time_now
+ now = self.class.db_time_now
- affected_rows = if locked_by != worker
-
+ affected_rows = if locked_by != worker
+
# We don't own this job so we will update the locked_by name and the locked_at
connection.update(<<-end_sql, "#{self.class.name} Update to aquire exclusive lock")
UPDATE #{self.class.table_name}
SET `locked_at`=#{quote_value(now)}, `locked_by`=#{quote_value(worker)}
WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_at` IS NULL OR `locked_at` < #{quote_value(now - max_run_time.to_i)})
end_sql
- else
-
- # We alrady own this job, this may happen if the job queue crashes.
+ else
+
+ # We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
connection.update(<<-end_sql, "#{self.class.name} Update exclusive lock")
UPDATE #{self.class.table_name}
@@ -124,95 +141,95 @@ def lock_exclusively!(max_run_time, worker = worker_name)
end_sql
end
-
- unless affected_rows == 1
+
+ unless affected_rows == 1
raise LockError, "Attempted to aquire exclusive lock failed"
- end
-
+ end
+
self.locked_at = now
- self.locked_by = worker
- end
-
+ self.locked_by = worker
+ end
+
def unlock
self.locked_at = nil
self.locked_by = nil
end
-
+
def self.work_off(num = 100)
success, failure = 0, 0
-
+
num.times do
-
+
job = self.reserve do |j|
begin
- j.perform
+ j.perform
success += 1
- rescue
+ rescue
failure += 1
raise
end
end
-
+
break if job.nil?
- end
-
+ end
+
return [success, failure]
end
-
+
private
-
- def deserialize(source)
+
+ def deserialize(source)
attempt_to_load_file = true
-
- begin
- handler = YAML.load(source) rescue nil
- return handler if handler.respond_to?(:perform)
-
+
+ begin
+ handler = YAML.load(source) rescue nil
+ return handler if handler.respond_to?(:perform)
+
if handler.nil?
if source =~ ParseObjectFromYaml
-
+
# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load($1)
-
+
# If successful, retry the yaml.load
handler = YAML.load(source)
- return handler if handler.respond_to?(:perform)
+ return handler if handler.respond_to?(:perform)
end
end
-
+
if handler.is_a?(YAML::Object)
-
+
# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load(handler.class)
-
+
# If successful, retry the yaml.load
handler = YAML.load(source)
return handler if handler.respond_to?(:perform)
end
-
- raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'
-
+
+ raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'
+
rescue TypeError, LoadError, NameError => e
-
- raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."
+
+ raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."
end
end
-
+
def attempt_to_load(klass)
- klass.constantize
+ klass.constantize
end
def self.db_time_now
- (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
+ (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
end
-
- protected
-
+
+ protected
+
def before_save
self.run_at ||= self.class.db_time_now
- end
-
+ end
+
end
end
@@ -1,7 +1,7 @@
module Delayed
module MessageSending
- def send_later(method, *args)
+ def send_later(method, *args)
Delayed::Job.enqueue Delayed::PerformableMethod.new(self, method.to_sym, args)
end
- end
+ end
end
@@ -1,40 +1,47 @@
module Delayed
- class PerformableMethod < Struct.new(:object, :method, :args)
- AR_STRING_FORMAT = /^AR\:([A-Z]\w+)\:(\d+)$/
-
+ class PerformableMethod < Struct.new(:object, :method, :args)
+ CLASS_STRING_FORMAT = /^CLASS\:([A-Z]\w+)$/
+ AR_STRING_FORMAT = /^AR\:([A-Z]\w+)\:(\d+)$/
+
def initialize(object, method, args)
raise NoMethodError, "undefined method `#{method}' for #{self.inspect}" unless object.respond_to?(method)
-
+
self.object = dump(object)
self.args = args.map { |a| dump(a) }
self.method = method.to_sym
end
-
+
def perform
load(object).send(method, *args.map{|a| load(a)})
rescue ActiveRecord::RecordNotFound
# We cannot do anything about objects which were deleted in the meantime
- true
- end
-
+ true
+ end
+
private
def load(arg)
case arg
- when AR_STRING_FORMAT then $1.constantize.find($2)
+ when CLASS_STRING_FORMAT then $1.constantize
+ when AR_STRING_FORMAT then $1.constantize.find($2)
else arg
end
end
-
+
def dump(arg)
case arg
+ when Class then class_to_string(arg)
when ActiveRecord::Base then ar_to_string(arg)
else arg
- end
+ end
end
-
+
def ar_to_string(obj)
"AR:#{obj.class}:#{obj.id}"
- end
+ end
+
+ def class_to_string(obj)
+ "CLASS:#{obj.name}"
+ end
end
end
Oops, something went wrong. Retry.

0 comments on commit b9ebbb4

Please sign in to comment.