Permalink
Browse files

Merge branch 'thumblemonks/master'

Conflicts:

	lib/delayed/job.rb
  • Loading branch information...
2 parents d644b3d + f2ea93c commit a71604e2a62dfce25a7b4585a72be3df775915e2 @tobi committed Dec 16, 2008
Showing with 112 additions and 81 deletions.
  1. +1 −0 .gitignore
  2. +12 −7 README.textile
  3. +40 −0 delayed_job.gemspec
  4. +1 −5 init.rb
  5. +40 −60 lib/delayed/job.rb
  6. +10 −8 lib/delayed/worker.rb
  7. +6 −0 lib/delayed_job.rb
  8. +1 −0 spec/database.rb
  9. +1 −1 spec/job_spec.rb
View
@@ -0,0 +1 @@
+*.gem
View
@@ -11,13 +11,6 @@ It is a direct extraction from Shopify where the job table is responsible for a
* updating solr, our search server, after product changes
* batch imports
* spam checks
-
-h2. Changes
-
-* 1.7 Added failed_at column which can optionally be set after a certain amount of failed job attempts. By default failed job attempts are destroyed after about a month.
-* 1.6 Renamed locked_until to locked_at. We now store when we start a given job instead of how long it will be locked by the worker. This allows us to get a reading on how long a job took to execute.
-* 1.5 Job runners can now be run in parallel. Two new database columns are needed: locked_until and locked_by. This allows us to use pessimistic locking, which enables us to run as many worker processes as we need to speed up queue processing.
-* 1.0 Initial release
h2. Setup
@@ -74,3 +67,15 @@ You can also run by writing a simple @script/job_runner@, and invoking it extern
h3. Cleaning up
You can invoke @rake jobs:clear@ to delete all jobs in the queue.
+
+h3. Changes
+
+* 1.7.0: Added failed_at column which can optionally be set after a certain amount of failed job attempts. By default failed job attempts are destroyed after about a month.
+
+* 1.6.0: Renamed locked_until to locked_at. We now store when we start a given job instead of how long it will be locked by the worker. This allows us to get a reading on how long a job took to execute.
+
+* 1.5.0: Job runners can now be run in parallel. Two new database columns are needed: locked_until and locked_by. This allows us to use pessimistic locking instead of relying on row level locks. This enables us to run as many worker processes as we need to speed up queue processing.
+
+* 1.2.0: Added #send_later to Object for simpler job creation
+
+* 1.0.0: Initial release
View
@@ -0,0 +1,40 @@
+version = File.read('README.textile').scan(/^\*\s+([\d\.]+)/).flatten
+
+Gem::Specification.new do |s|
+ s.name = "delayed_job"
+ s.version = version.first
+ s.date = "2008-11-28"
+ s.summary = "Database-backed asynchronous priority queue system -- Extracted from Shopify"
+ s.email = "tobi@leetsoft.com"
+ s.homepage = "http://github.com/tobi/delayed_job/tree/master"
+ s.description = "Delated_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background. It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks."
+ s.authors = ["Tobias Lütke"]
+
+ # s.bindir = "bin"
+ # s.executables = ["delayed_job"]
+ # s.default_executable = "delayed_job"
+
+ s.has_rdoc = false
+ s.rdoc_options = ["--main", "README.textile"]
+ s.extra_rdoc_files = ["README.textile"]
+
+ # run git ls-files to get an updated list
+ s.files = %w[
+ MIT-LICENSE
+ README.textile
+ delayed_job.gemspec
+ init.rb
+ lib/delayed/job.rb
+ lib/delayed/message_sending.rb
+ lib/delayed/performable_method.rb
+ lib/delayed/worker.rb
+ lib/delayed_job.rb
+ tasks/jobs.rake
+ ]
+ s.test_files = %w[
+ spec/database.rb
+ spec/delayed_method_spec.rb
+ spec/job_spec.rb
+ spec/story_spec.rb
+ ]
+end
View
@@ -1,5 +1 @@
-require File.dirname(__FILE__) + '/lib/delayed/message_sending'
-require File.dirname(__FILE__) + '/lib/delayed/performable_method'
-require File.dirname(__FILE__) + '/lib/delayed/job'
-
-Object.send(:include, Delayed::MessageSending)
+require File.dirname(__FILE__) + '/lib/delayed_job'
View
@@ -15,8 +15,8 @@ class Job < ActiveRecord::Base
cattr_accessor :destroy_failed_jobs
self.destroy_failed_jobs = true
- # Every worker has a unique name which by default is the pid of the process.
- # There are some advantages to overriding this with something which survives worker retarts:
+ # Every worker has a unique name which by default is the pid of the process.
+ # There are some advantages to overriding this with something which survives worker retarts:
# Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
cattr_accessor :worker_name
self.worker_name = "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}"
@@ -25,10 +25,10 @@ class Job < ActiveRecord::Base
NextTaskOrder = 'priority DESC, run_at ASC'
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
-
+
cattr_accessor :min_priority, :max_priority
self.min_priority = nil
- self.max_priority = nil
+ self.max_priority = nil
class LockError < StandardError
end
@@ -45,8 +45,8 @@ def failed?
def payload_object
@payload_object ||= deserialize(self['handler'])
end
-
- def name
+
+ def name
@name ||= begin
payload = payload_object
if payload.respond_to?(:display_name)
@@ -90,32 +90,32 @@ def self.enqueue(*args, &block)
end
def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
-
- time_now = db_time_now
-
+
+ time_now = db_time_now
+
sql = NextTaskSQL.dup
conditions = [time_now, time_now - max_run_time, worker_name]
-
+
if self.min_priority
sql << ' AND (priority >= ?)'
conditions << min_priority
end
-
+
if self.max_priority
sql << ' AND (priority <= ?)'
- conditions << max_priority
+ conditions << max_priority
end
- conditions.unshift(sql)
-
+ conditions.unshift(sql)
+
records = ActiveRecord::Base.silence do
find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
end
-
+
records.sort { rand() }
- end
-
+ end
+
# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(max_run_time = MAX_RUN_TIME, &block)
@@ -136,7 +136,7 @@ def self.reserve(max_run_time = MAX_RUN_TIME, &block)
rescue LockError
# We did not get the lock, some other worker process must have
logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}"
- rescue StandardError => e
+ rescue StandardError => e
job.reschedule e.message, e.backtrace
log_exception(job, e)
return job
@@ -154,16 +154,16 @@ def lock_exclusively!(max_run_time, worker = worker_name)
# We don't own this job so we will update the locked_by name and the locked_at
self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
else
- # We already own this job, this may happen if the job queue crashes.
+ # We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
end
raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1
-
+
self.locked_at = now
self.locked_by = worker
end
-
+
def unlock
self.locked_at = nil
self.locked_by = nil
@@ -179,7 +179,6 @@ def self.work_off(num = 100)
success, failure = 0, 0
num.times do
-
job = self.reserve do |j|
begin
j.perform
@@ -194,56 +193,37 @@ def self.work_off(num = 100)
end
return [success, failure]
- end
-
-
+ end
+
# Moved into its own method so that new_relic can trace it.
def self.invoke_job(job, &block)
block.call(job)
end
-
- private
+ private
def deserialize(source)
- attempt_to_load_file = true
-
- begin
- handler = YAML.load(source) rescue nil
- return handler if handler.respond_to?(:perform)
-
- if handler.nil?
- if source =~ ParseObjectFromYaml
-
- # Constantize the object so that ActiveSupport can attempt
- # its auto loading magic. Will raise LoadError if not successful.
- attempt_to_load($1)
+ handler = YAML.load(source) rescue nil
- # If successful, retry the yaml.load
- handler = YAML.load(source)
- return handler if handler.respond_to?(:perform)
- end
- end
-
- if handler.is_a?(YAML::Object)
-
- # Constantize the object so that ActiveSupport can attempt
- # its auto loading magic. Will raise LoadError if not successful.
- attempt_to_load(handler.class)
-
- # If successful, retry the yaml.load
- handler = YAML.load(source)
- return handler if handler.respond_to?(:perform)
+ unless handler.respond_to?(:perform)
+ if handler.nil? && source =~ ParseObjectFromYaml
+ handler_class = $1
end
+ attempt_to_load(handler_class || handler.class)
+ handler = YAML.load(source)
+ end
- raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'
-
- rescue TypeError, LoadError, NameError => e
+ return handler if handler.respond_to?(:perform)
- raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."
- end
+ raise DeserializationError,
+ 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'
+ rescue TypeError, LoadError, NameError => e
+ raise DeserializationError,
+ "Job failed to load: #{e.message}. Try to manually require the required file."
end
+ # Constantize the object so that ActiveSupport can attempt
+ # its auto loading magic. Will raise LoadError if not successful.
def attempt_to_load(klass)
klass.constantize
end
@@ -252,7 +232,7 @@ def self.db_time_now
(ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
end
- protected
+ protected
def before_save
self.run_at ||= self.class.db_time_now
View
@@ -2,19 +2,21 @@ module Delayed
class Worker
SLEEP = 5
+ cattr_accessor :logger
+ self.logger = RAILS_DEFAULT_LOGGER if const_defined?(:RAILS_DEFAULT_LOGGER)
+
def initialize(options={})
- @quiet = options[:quiet]
+ @quiet = options[:quiet]
Delayed::Job.min_priority = options[:min_priority] if options.has_key?(:min_priority)
Delayed::Job.max_priority = options[:max_priority] if options.has_key?(:max_priority)
- end
+ end
def start
say "*** Starting job worker #{Delayed::Job.worker_name}"
trap('TERM') { say 'Exiting...'; $exit = true }
trap('INT') { say 'Exiting...'; $exit = true }
-
-
+
loop do
result = nil
@@ -33,15 +35,15 @@ def start
end
break if $exit
- end
-
+ end
+
ensure
Delayed::Job.clear_locks!
end
-
+
def say(text)
puts text unless @quiet
- RAILS_DEFAULT_LOGGER.info text
+ logger.info text if logger
end
end
View
@@ -0,0 +1,6 @@
+require File.dirname(__FILE__) + '/delayed/message_sending'
+require File.dirname(__FILE__) + '/delayed/performable_method'
+require File.dirname(__FILE__) + '/delayed/job'
+require File.dirname(__FILE__) + '/delayed/worker'
+
+Object.send(:include, Delayed::MessageSending)
View
@@ -3,6 +3,7 @@
require 'rubygems'
require 'active_record'
+gem 'sqlite3-ruby'
require File.dirname(__FILE__) + '/../init'
require 'spec'
View
@@ -283,7 +283,7 @@ def perform; @@runs += 1; end
it "should leave the queue in a consistent state and not run the job if locking fails" do
SimpleJob.runs.should == 0
- @job.stub!(:lock_exclusively!).with(:any_args).once.and_raise(Delayed::Job::LockError)
+ @job.stub!(:lock_exclusively!).with(any_args).once.and_raise(Delayed::Job::LockError)
Delayed::Job.should_receive(:find_available).once.and_return([@job])
Delayed::Job.work_off(1)
SimpleJob.runs.should == 0

0 comments on commit a71604e

Please sign in to comment.