From bcf8d1d2decd70b59b63640263bfde9a12959c5c Mon Sep 17 00:00:00 2001 From: Justin Knowlden Date: Fri, 28 Nov 2008 22:34:06 -0600 Subject: [PATCH 1/5] Factored duplication out of Job#deserialize. Made roodi happier about its cyclomatic complexity as well. --- lib/delayed/job.rb | 58 +++++++++++++++------------------------------- spec/database.rb | 1 + spec/job_spec.rb | 2 +- 3 files changed, 21 insertions(+), 40 deletions(-) diff --git a/lib/delayed/job.rb b/lib/delayed/job.rb index ac7298fd3..b2b5c0240 100644 --- a/lib/delayed/job.rb +++ b/lib/delayed/job.rb @@ -185,7 +185,6 @@ def self.work_off(num = 100) success, failure = 0, 0 num.times do - job = self.reserve do |j| begin j.perform @@ -200,56 +199,37 @@ def self.work_off(num = 100) end return [success, failure] - end - - + end + # Moved into its own method so that new_relic can trace it. def self.invoke_job(job, &block) block.call(job) end - - private + private def deserialize(source) - attempt_to_load_file = true - - begin - handler = YAML.load(source) rescue nil - return handler if handler.respond_to?(:perform) - - if handler.nil? - if source =~ ParseObjectFromYaml - - # Constantize the object so that ActiveSupport can attempt - # its auto loading magic. Will raise LoadError if not successful. - attempt_to_load($1) + handler = YAML.load(source) rescue nil - # If successful, retry the yaml.load - handler = YAML.load(source) - return handler if handler.respond_to?(:perform) - end - end - - if handler.is_a?(YAML::Object) - - # Constantize the object so that ActiveSupport can attempt - # its auto loading magic. Will raise LoadError if not successful. - attempt_to_load(handler.class) - - # If successful, retry the yaml.load - handler = YAML.load(source) - return handler if handler.respond_to?(:perform) + unless handler.respond_to?(:perform) + if handler.nil? && source =~ ParseObjectFromYaml + handler_class = $1 end + attempt_to_load(handler_class || handler.class) + handler = YAML.load(source) + end - raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.' - - rescue TypeError, LoadError, NameError => e + return handler if handler.respond_to?(:perform) - raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file." - end + raise DeserializationError, + 'Job failed to load: Unknown handler. Try to manually require the appropiate file.' + rescue TypeError, LoadError, NameError => e + raise DeserializationError, + "Job failed to load: #{e.message}. Try to manually require the required file." end + # Constantize the object so that ActiveSupport can attempt + # its auto loading magic. Will raise LoadError if not successful. def attempt_to_load(klass) klass.constantize end @@ -258,7 +238,7 @@ def self.db_time_now (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now end - protected + protected def before_save self.run_at ||= self.class.db_time_now diff --git a/spec/database.rb b/spec/database.rb index ea7fa3e35..20ae53da5 100644 --- a/spec/database.rb +++ b/spec/database.rb @@ -3,6 +3,7 @@ require 'rubygems' require 'active_record' +gem 'sqlite3-ruby' require File.dirname(__FILE__) + '/../init' require 'spec' diff --git a/spec/job_spec.rb b/spec/job_spec.rb index a19a565e2..7d7a735dc 100644 --- a/spec/job_spec.rb +++ b/spec/job_spec.rb @@ -283,7 +283,7 @@ def perform; @@runs += 1; end it "should leave the queue in a consistent state and not run the job if locking fails" do SimpleJob.runs.should == 0 - @job.stub!(:lock_exclusively!).with(:any_args).once.and_raise(Delayed::Job::LockError) + @job.stub!(:lock_exclusively!).with(any_args).once.and_raise(Delayed::Job::LockError) Delayed::Job.should_receive(:find_available).once.and_return([@job]) Delayed::Job.work_off(1) SimpleJob.runs.should == 0 From 9ba6b0872375b55eadf9979b8773efe9132bda2c Mon Sep 17 00:00:00 2001 From: Justin Knowlden Date: Sat, 29 Nov 2008 00:06:07 -0600 Subject: [PATCH 2/5] Made a gem out of delayed_job so I can use it in my services. Had to modify Worker to not infer DJ is running in a Rails instance. --- .gitignore | 1 + HISTORY.txt | 2 ++ delayed_job.gemspec | 39 ++++++++++++++++++++++++++++++++++++ init.rb | 6 +----- lib/delayed/job.rb | 46 +++++++++++++++++++++---------------------- lib/delayed/worker.rb | 18 +++++++++-------- lib/delayed_job.rb | 6 ++++++ 7 files changed, 82 insertions(+), 36 deletions(-) create mode 100644 .gitignore create mode 100644 HISTORY.txt create mode 100644 delayed_job.gemspec create mode 100644 lib/delayed_job.rb diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..c111b3313 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.gem diff --git a/HISTORY.txt b/HISTORY.txt new file mode 100644 index 000000000..68fc4176a --- /dev/null +++ b/HISTORY.txt @@ -0,0 +1,2 @@ +== 0.1.0 / 2008-11-28 + * First of many versions diff --git a/delayed_job.gemspec b/delayed_job.gemspec new file mode 100644 index 000000000..12892fb89 --- /dev/null +++ b/delayed_job.gemspec @@ -0,0 +1,39 @@ +Gem::Specification.new do |s| + s.name = "delayed_job" + s.version = "0.1.0" + s.date = "2008-11-28" + s.summary = "Database-backed asynchronous priority queue system -- Extracted from Shopify" + s.email = "tobi@leetsoft.com" + s.homepage = "http://github.com/tobi/delayed_job/tree/master" + s.description = "Delated_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background. It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks." + s.authors = ["Tobias Lütke", "Justin Knowlden"] + + # s.bindir = "bin" + # s.executables = ["delayed_job"] + # s.default_executable = "delayed_job" + + s.has_rdoc = false + s.rdoc_options = ["--main", "README.textile"] + s.extra_rdoc_files = ["HISTORY.txt", "README.textile"] + + # run git ls-files to get an updated list + s.files = %w[ + HISTORY.txt + MIT-LICENSE + README.textile + delayed_job.gemspec + init.rb + lib/delayed/job.rb + lib/delayed/message_sending.rb + lib/delayed/performable_method.rb + lib/delayed/worker.rb + lib/delayed_job.rb + tasks/jobs.rake + ] + s.test_files = %w[ + spec/database.rb + spec/delayed_method_spec.rb + spec/job_spec.rb + spec/story_spec.rb + ] +end diff --git a/init.rb b/init.rb index 765138bef..a816d7ee8 100644 --- a/init.rb +++ b/init.rb @@ -1,5 +1 @@ -require File.dirname(__FILE__) + '/lib/delayed/message_sending' -require File.dirname(__FILE__) + '/lib/delayed/performable_method' -require File.dirname(__FILE__) + '/lib/delayed/job' - -Object.send(:include, Delayed::MessageSending) \ No newline at end of file +require File.dirname(__FILE__) + '/lib/delayed_job' diff --git a/lib/delayed/job.rb b/lib/delayed/job.rb index b2b5c0240..f6913dd53 100644 --- a/lib/delayed/job.rb +++ b/lib/delayed/job.rb @@ -15,8 +15,8 @@ class Job < ActiveRecord::Base cattr_accessor :destroy_failed_jobs self.destroy_failed_jobs = true - # Every worker has a unique name which by default is the pid of the process. - # There are some advantages to overriding this with something which survives worker retarts: + # Every worker has a unique name which by default is the pid of the process. + # There are some advantages to overriding this with something which survives worker retarts: # Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before. cattr_accessor :worker_name self.worker_name = "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}" @@ -25,10 +25,10 @@ class Job < ActiveRecord::Base NextTaskOrder = 'priority DESC, run_at ASC' ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/ - + cattr_accessor :min_priority, :max_priority self.min_priority = nil - self.max_priority = nil + self.max_priority = nil class LockError < StandardError end @@ -45,8 +45,8 @@ def failed? def payload_object @payload_object ||= deserialize(self['handler']) end - - def name + + def name @name ||= begin payload = payload_object if payload.respond_to?(:display_name) @@ -80,13 +80,13 @@ def self.enqueue(*args, &block) if block_given? priority = args.first || 0 run_at = args.second - + Job.create(:payload_object => EvaledJob.new(&block), :priority => priority.to_i, :run_at => run_at) else object = args.first priority = args.second || 0 run_at = args.third - + unless object.respond_to?(:perform) raise ArgumentError, 'Cannot enqueue items which do not respond to perform' end @@ -96,32 +96,32 @@ def self.enqueue(*args, &block) end def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME) - - time_now = db_time_now - + + time_now = db_time_now + sql = NextTaskSQL.dup conditions = [time_now, time_now - max_run_time, worker_name] - + if self.min_priority sql << ' AND (priority >= ?)' conditions << min_priority end - + if self.max_priority sql << ' AND (priority <= ?)' - conditions << max_priority + conditions << max_priority end - conditions.unshift(sql) - + conditions.unshift(sql) + records = ActiveRecord::Base.silence do find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit) end - + records.sort { rand() } - end - + end + # Get the payload of the next job we can get an exclusive lock on. # If no jobs are left we return nil def self.reserve(max_run_time = MAX_RUN_TIME, &block) @@ -142,7 +142,7 @@ def self.reserve(max_run_time = MAX_RUN_TIME, &block) rescue LockError # We did not get the lock, some other worker process must have logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}" - rescue StandardError => e + rescue StandardError => e job.reschedule e.message, e.backtrace log_exception(job, e) return job @@ -160,16 +160,16 @@ def lock_exclusively!(max_run_time, worker = worker_name) # We don't own this job so we will update the locked_by name and the locked_at self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)]) else - # We already own this job, this may happen if the job queue crashes. + # We already own this job, this may happen if the job queue crashes. # Simply resume and update the locked_at self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker]) end raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1 - + self.locked_at = now self.locked_by = worker end - + def unlock self.locked_at = nil self.locked_by = nil diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index b0888f781..de471ce84 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -2,19 +2,21 @@ module Delayed class Worker SLEEP = 5 + cattr_accessor :logger + self.logger = RAILS_DEFAULT_LOGGER if const_defined?(:RAILS_DEFAULT_LOGGER) + def initialize(options={}) - @quiet = options[:quiet] + @quiet = options[:quiet] Delayed::Job.min_priority = options[:min_priority] if options.has_key?(:min_priority) Delayed::Job.max_priority = options[:max_priority] if options.has_key?(:max_priority) - end + end def start say "*** Starting job worker #{Delayed::Job.worker_name}" trap('TERM') { say 'Exiting...'; $exit = true } trap('INT') { say 'Exiting...'; $exit = true } - - + loop do result = nil @@ -33,15 +35,15 @@ def start end break if $exit - end - + end + ensure Delayed::Job.clear_locks! end - + def say(text) puts text unless @quiet - RAILS_DEFAULT_LOGGER.info text + logger.info text if logger end end diff --git a/lib/delayed_job.rb b/lib/delayed_job.rb new file mode 100644 index 000000000..8affb189b --- /dev/null +++ b/lib/delayed_job.rb @@ -0,0 +1,6 @@ +require File.dirname(__FILE__) + '/delayed/message_sending' +require File.dirname(__FILE__) + '/delayed/performable_method' +require File.dirname(__FILE__) + '/delayed/job' +require File.dirname(__FILE__) + '/delayed/worker' + +Object.send(:include, Delayed::MessageSending) \ No newline at end of file From 4946d39779f7f16c8bb501af1599e5b2c0b09703 Mon Sep 17 00:00:00 2001 From: Justin Knowlden Date: Sat, 6 Dec 2008 16:26:48 -0600 Subject: [PATCH 3/5] Updating rev to see if github will build the damn gem now --- delayed_job.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/delayed_job.gemspec b/delayed_job.gemspec index 12892fb89..0f648b270 100644 --- a/delayed_job.gemspec +++ b/delayed_job.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = "delayed_job" - s.version = "0.1.0" + s.version = "0.1.1" s.date = "2008-11-28" s.summary = "Database-backed asynchronous priority queue system -- Extracted from Shopify" s.email = "tobi@leetsoft.com" From b419b69201ce089039e4d4d3061104d14fe72759 Mon Sep 17 00:00:00 2001 From: Gabriel Gironda Date: Sat, 6 Dec 2008 16:32:00 -0600 Subject: [PATCH 4/5] IM A GEM BUILDING MACHINE --- delayed_job.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/delayed_job.gemspec b/delayed_job.gemspec index 0f648b270..ebe72210d 100644 --- a/delayed_job.gemspec +++ b/delayed_job.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = "delayed_job" - s.version = "0.1.1" + s.version = "0.1.7" s.date = "2008-11-28" s.summary = "Database-backed asynchronous priority queue system -- Extracted from Shopify" s.email = "tobi@leetsoft.com" From f2ea93c1e565c95eac173fdda53a6feeadfe9aa6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20L=C3=BCtke?= Date: Tue, 16 Dec 2008 09:26:10 -0500 Subject: [PATCH 5/5] Small changes in the way th egems are build --- HISTORY.txt | 2 -- README.textile | 19 ++++++++++++------- delayed_job.gemspec | 9 +++++---- 3 files changed, 17 insertions(+), 13 deletions(-) delete mode 100644 HISTORY.txt diff --git a/HISTORY.txt b/HISTORY.txt deleted file mode 100644 index 68fc4176a..000000000 --- a/HISTORY.txt +++ /dev/null @@ -1,2 +0,0 @@ -== 0.1.0 / 2008-11-28 - * First of many versions diff --git a/README.textile b/README.textile index f6c3c5c6c..c09d62f54 100644 --- a/README.textile +++ b/README.textile @@ -11,13 +11,6 @@ It is a direct extraction from Shopify where the job table is responsible for a * updating solr, our search server, after product changes * batch imports * spam checks - -h2. Changes - -* 1.7 Added failed_at column which can optionally be set after a certain amount of failed job attempts. By default failed job attempts are destroyed after about a month. -* 1.6 Renamed locked_until to locked_at. We now store when we start a given job instead of how long it will be locked by the worker. This allows us to get a reading on how long a job took to execute. -* 1.5 Job runners can now be run in parallel. Two new database columns are needed: locked_until and locked_by. This allows us to use pessimistic locking, which enables us to run as many worker processes as we need to speed up queue processing. -* 1.0 Initial release h2. Setup @@ -74,3 +67,15 @@ You can also run by writing a simple @script/job_runner@, and invoking it extern h3. Cleaning up You can invoke @rake jobs:clear@ to delete all jobs in the queue. + +h3. Changes + +* 1.7.0: Added failed_at column which can optionally be set after a certain amount of failed job attempts. By default failed job attempts are destroyed after about a month. + +* 1.6.0: Renamed locked_until to locked_at. We now store when we start a given job instead of how long it will be locked by the worker. This allows us to get a reading on how long a job took to execute. + +* 1.5.0: Job runners can now be run in parallel. Two new database columns are needed: locked_until and locked_by. This allows us to use pessimistic locking instead of relying on row level locks. This enables us to run as many worker processes as we need to speed up queue processing. + +* 1.2.0: Added #send_later to Object for simpler job creation + +* 1.0.0: Initial release diff --git a/delayed_job.gemspec b/delayed_job.gemspec index ebe72210d..851ca0d6d 100644 --- a/delayed_job.gemspec +++ b/delayed_job.gemspec @@ -1,12 +1,14 @@ +version = File.read('README.textile').scan(/^\*\s+([\d\.]+)/).flatten + Gem::Specification.new do |s| s.name = "delayed_job" - s.version = "0.1.7" + s.version = version.first s.date = "2008-11-28" s.summary = "Database-backed asynchronous priority queue system -- Extracted from Shopify" s.email = "tobi@leetsoft.com" s.homepage = "http://github.com/tobi/delayed_job/tree/master" s.description = "Delated_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background. It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks." - s.authors = ["Tobias Lütke", "Justin Knowlden"] + s.authors = ["Tobias Lütke"] # s.bindir = "bin" # s.executables = ["delayed_job"] @@ -14,11 +16,10 @@ Gem::Specification.new do |s| s.has_rdoc = false s.rdoc_options = ["--main", "README.textile"] - s.extra_rdoc_files = ["HISTORY.txt", "README.textile"] + s.extra_rdoc_files = ["README.textile"] # run git ls-files to get an updated list s.files = %w[ - HISTORY.txt MIT-LICENSE README.textile delayed_job.gemspec