diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..c111b3313 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.gem diff --git a/README.textile b/README.textile index f6c3c5c6c..c09d62f54 100644 --- a/README.textile +++ b/README.textile @@ -11,13 +11,6 @@ It is a direct extraction from Shopify where the job table is responsible for a * updating solr, our search server, after product changes * batch imports * spam checks - -h2. Changes - -* 1.7 Added failed_at column which can optionally be set after a certain amount of failed job attempts. By default failed job attempts are destroyed after about a month. -* 1.6 Renamed locked_until to locked_at. We now store when we start a given job instead of how long it will be locked by the worker. This allows us to get a reading on how long a job took to execute. -* 1.5 Job runners can now be run in parallel. Two new database columns are needed: locked_until and locked_by. This allows us to use pessimistic locking, which enables us to run as many worker processes as we need to speed up queue processing. -* 1.0 Initial release h2. Setup @@ -74,3 +67,15 @@ You can also run by writing a simple @script/job_runner@, and invoking it extern h3. Cleaning up You can invoke @rake jobs:clear@ to delete all jobs in the queue. + +h3. Changes + +* 1.7.0: Added failed_at column which can optionally be set after a certain amount of failed job attempts. By default failed job attempts are destroyed after about a month. + +* 1.6.0: Renamed locked_until to locked_at. We now store when we start a given job instead of how long it will be locked by the worker. This allows us to get a reading on how long a job took to execute. + +* 1.5.0: Job runners can now be run in parallel. Two new database columns are needed: locked_until and locked_by. This allows us to use pessimistic locking instead of relying on row level locks. This enables us to run as many worker processes as we need to speed up queue processing. + +* 1.2.0: Added #send_later to Object for simpler job creation + +* 1.0.0: Initial release diff --git a/delayed_job.gemspec b/delayed_job.gemspec new file mode 100644 index 000000000..851ca0d6d --- /dev/null +++ b/delayed_job.gemspec @@ -0,0 +1,40 @@ +version = File.read('README.textile').scan(/^\*\s+([\d\.]+)/).flatten + +Gem::Specification.new do |s| + s.name = "delayed_job" + s.version = version.first + s.date = "2008-11-28" + s.summary = "Database-backed asynchronous priority queue system -- Extracted from Shopify" + s.email = "tobi@leetsoft.com" + s.homepage = "http://github.com/tobi/delayed_job/tree/master" + s.description = "Delated_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background. It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks." + s.authors = ["Tobias Lütke"] + + # s.bindir = "bin" + # s.executables = ["delayed_job"] + # s.default_executable = "delayed_job" + + s.has_rdoc = false + s.rdoc_options = ["--main", "README.textile"] + s.extra_rdoc_files = ["README.textile"] + + # run git ls-files to get an updated list + s.files = %w[ + MIT-LICENSE + README.textile + delayed_job.gemspec + init.rb + lib/delayed/job.rb + lib/delayed/message_sending.rb + lib/delayed/performable_method.rb + lib/delayed/worker.rb + lib/delayed_job.rb + tasks/jobs.rake + ] + s.test_files = %w[ + spec/database.rb + spec/delayed_method_spec.rb + spec/job_spec.rb + spec/story_spec.rb + ] +end diff --git a/init.rb b/init.rb index 765138bef..a816d7ee8 100644 --- a/init.rb +++ b/init.rb @@ -1,5 +1 @@ -require File.dirname(__FILE__) + '/lib/delayed/message_sending' -require File.dirname(__FILE__) + '/lib/delayed/performable_method' -require File.dirname(__FILE__) + '/lib/delayed/job' - -Object.send(:include, Delayed::MessageSending) \ No newline at end of file +require File.dirname(__FILE__) + '/lib/delayed_job' diff --git a/lib/delayed/job.rb b/lib/delayed/job.rb index 83d28cb17..f826efcc0 100644 --- a/lib/delayed/job.rb +++ b/lib/delayed/job.rb @@ -15,8 +15,8 @@ class Job < ActiveRecord::Base cattr_accessor :destroy_failed_jobs self.destroy_failed_jobs = true - # Every worker has a unique name which by default is the pid of the process. - # There are some advantages to overriding this with something which survives worker retarts: + # Every worker has a unique name which by default is the pid of the process. + # There are some advantages to overriding this with something which survives worker retarts: # Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before. cattr_accessor :worker_name self.worker_name = "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}" @@ -25,10 +25,10 @@ class Job < ActiveRecord::Base NextTaskOrder = 'priority DESC, run_at ASC' ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/ - + cattr_accessor :min_priority, :max_priority self.min_priority = nil - self.max_priority = nil + self.max_priority = nil class LockError < StandardError end @@ -45,8 +45,8 @@ def failed? def payload_object @payload_object ||= deserialize(self['handler']) end - - def name + + def name @name ||= begin payload = payload_object if payload.respond_to?(:display_name) @@ -90,32 +90,32 @@ def self.enqueue(*args, &block) end def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME) - - time_now = db_time_now - + + time_now = db_time_now + sql = NextTaskSQL.dup conditions = [time_now, time_now - max_run_time, worker_name] - + if self.min_priority sql << ' AND (priority >= ?)' conditions << min_priority end - + if self.max_priority sql << ' AND (priority <= ?)' - conditions << max_priority + conditions << max_priority end - conditions.unshift(sql) - + conditions.unshift(sql) + records = ActiveRecord::Base.silence do find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit) end - + records.sort { rand() } - end - + end + # Get the payload of the next job we can get an exclusive lock on. # If no jobs are left we return nil def self.reserve(max_run_time = MAX_RUN_TIME, &block) @@ -136,7 +136,7 @@ def self.reserve(max_run_time = MAX_RUN_TIME, &block) rescue LockError # We did not get the lock, some other worker process must have logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}" - rescue StandardError => e + rescue StandardError => e job.reschedule e.message, e.backtrace log_exception(job, e) return job @@ -154,16 +154,16 @@ def lock_exclusively!(max_run_time, worker = worker_name) # We don't own this job so we will update the locked_by name and the locked_at self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)]) else - # We already own this job, this may happen if the job queue crashes. + # We already own this job, this may happen if the job queue crashes. # Simply resume and update the locked_at self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker]) end raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1 - + self.locked_at = now self.locked_by = worker end - + def unlock self.locked_at = nil self.locked_by = nil @@ -179,7 +179,6 @@ def self.work_off(num = 100) success, failure = 0, 0 num.times do - job = self.reserve do |j| begin j.perform @@ -194,56 +193,37 @@ def self.work_off(num = 100) end return [success, failure] - end - - + end + # Moved into its own method so that new_relic can trace it. def self.invoke_job(job, &block) block.call(job) end - - private + private def deserialize(source) - attempt_to_load_file = true - - begin - handler = YAML.load(source) rescue nil - return handler if handler.respond_to?(:perform) - - if handler.nil? - if source =~ ParseObjectFromYaml - - # Constantize the object so that ActiveSupport can attempt - # its auto loading magic. Will raise LoadError if not successful. - attempt_to_load($1) + handler = YAML.load(source) rescue nil - # If successful, retry the yaml.load - handler = YAML.load(source) - return handler if handler.respond_to?(:perform) - end - end - - if handler.is_a?(YAML::Object) - - # Constantize the object so that ActiveSupport can attempt - # its auto loading magic. Will raise LoadError if not successful. - attempt_to_load(handler.class) - - # If successful, retry the yaml.load - handler = YAML.load(source) - return handler if handler.respond_to?(:perform) + unless handler.respond_to?(:perform) + if handler.nil? && source =~ ParseObjectFromYaml + handler_class = $1 end + attempt_to_load(handler_class || handler.class) + handler = YAML.load(source) + end - raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.' - - rescue TypeError, LoadError, NameError => e + return handler if handler.respond_to?(:perform) - raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file." - end + raise DeserializationError, + 'Job failed to load: Unknown handler. Try to manually require the appropiate file.' + rescue TypeError, LoadError, NameError => e + raise DeserializationError, + "Job failed to load: #{e.message}. Try to manually require the required file." end + # Constantize the object so that ActiveSupport can attempt + # its auto loading magic. Will raise LoadError if not successful. def attempt_to_load(klass) klass.constantize end @@ -252,7 +232,7 @@ def self.db_time_now (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now end - protected + protected def before_save self.run_at ||= self.class.db_time_now diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index b0888f781..de471ce84 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -2,19 +2,21 @@ module Delayed class Worker SLEEP = 5 + cattr_accessor :logger + self.logger = RAILS_DEFAULT_LOGGER if const_defined?(:RAILS_DEFAULT_LOGGER) + def initialize(options={}) - @quiet = options[:quiet] + @quiet = options[:quiet] Delayed::Job.min_priority = options[:min_priority] if options.has_key?(:min_priority) Delayed::Job.max_priority = options[:max_priority] if options.has_key?(:max_priority) - end + end def start say "*** Starting job worker #{Delayed::Job.worker_name}" trap('TERM') { say 'Exiting...'; $exit = true } trap('INT') { say 'Exiting...'; $exit = true } - - + loop do result = nil @@ -33,15 +35,15 @@ def start end break if $exit - end - + end + ensure Delayed::Job.clear_locks! end - + def say(text) puts text unless @quiet - RAILS_DEFAULT_LOGGER.info text + logger.info text if logger end end diff --git a/lib/delayed_job.rb b/lib/delayed_job.rb new file mode 100644 index 000000000..8affb189b --- /dev/null +++ b/lib/delayed_job.rb @@ -0,0 +1,6 @@ +require File.dirname(__FILE__) + '/delayed/message_sending' +require File.dirname(__FILE__) + '/delayed/performable_method' +require File.dirname(__FILE__) + '/delayed/job' +require File.dirname(__FILE__) + '/delayed/worker' + +Object.send(:include, Delayed::MessageSending) \ No newline at end of file diff --git a/spec/database.rb b/spec/database.rb index ea7fa3e35..20ae53da5 100644 --- a/spec/database.rb +++ b/spec/database.rb @@ -3,6 +3,7 @@ require 'rubygems' require 'active_record' +gem 'sqlite3-ruby' require File.dirname(__FILE__) + '/../init' require 'spec' diff --git a/spec/job_spec.rb b/spec/job_spec.rb index a19a565e2..7d7a735dc 100644 --- a/spec/job_spec.rb +++ b/spec/job_spec.rb @@ -283,7 +283,7 @@ def perform; @@runs += 1; end it "should leave the queue in a consistent state and not run the job if locking fails" do SimpleJob.runs.should == 0 - @job.stub!(:lock_exclusively!).with(:any_args).once.and_raise(Delayed::Job::LockError) + @job.stub!(:lock_exclusively!).with(any_args).once.and_raise(Delayed::Job::LockError) Delayed::Job.should_receive(:find_available).once.and_return([@job]) Delayed::Job.work_off(1) SimpleJob.runs.should == 0