From ecfc1e436d4c9b991f46ca2d3d732a6fb1060007 Mon Sep 17 00:00:00 2001 From: guns Date: Tue, 7 Sep 2010 03:12:15 -0500 Subject: [PATCH 01/10] temporary fix for http://github.com/collectiveidea/delayed_job/issues/issue/113 rename PerformableMethod#method -> #delayed_method to avoid overriding Object#method --- lib/delayed/performable_mailer.rb | 2 +- lib/delayed/performable_method.rb | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/delayed/performable_mailer.rb b/lib/delayed/performable_mailer.rb index 19cb7266e..a5b14aeab 100644 --- a/lib/delayed/performable_mailer.rb +++ b/lib/delayed/performable_mailer.rb @@ -3,7 +3,7 @@ module Delayed class PerformableMailer < PerformableMethod def perform - object.send(method, *args).deliver + object.send(delayed_method, *args).deliver end end end diff --git a/lib/delayed/performable_method.rb b/lib/delayed/performable_method.rb index bc5908921..3c67d1d13 100644 --- a/lib/delayed/performable_method.rb +++ b/lib/delayed/performable_method.rb @@ -1,19 +1,19 @@ module Delayed - class PerformableMethod < Struct.new(:object, :method, :args) - def initialize(object, method, args) - raise NoMethodError, "undefined method `#{method}' for #{object.inspect}" unless object.respond_to?(method, true) + class PerformableMethod < Struct.new(:object, :delayed_method, :args) + def initialize(object, delayed_method, args) + raise NoMethodError, "undefined method `#{delayed_method}' for #{object.inspect}" unless object.respond_to?(delayed_method, true) self.object = object self.args = args - self.method = method.to_sym + self.delayed_method = delayed_method.to_sym end def display_name - "#{object.class}##{method}" + "#{object.class}##{delayed_method}" end def perform - object.send(method, *args) if object + object.send(delayed_method, *args) if object end def method_missing(symbol, *args) From 76795ea02e457a6bffaf6ecef3bb3f98f9adae06 Mon Sep 17 00:00:00 2001 From: guns Date: Thu, 9 Sep 2010 03:50:22 -0500 Subject: [PATCH 02/10] Revert "temporary fix for http://github.com/collectiveidea/delayed_job/issues/issue/113" This reverts commit ecfc1e436d4c9b991f46ca2d3d732a6fb1060007. --- lib/delayed/performable_mailer.rb | 2 +- lib/delayed/performable_method.rb | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/delayed/performable_mailer.rb b/lib/delayed/performable_mailer.rb index a5b14aeab..19cb7266e 100644 --- a/lib/delayed/performable_mailer.rb +++ b/lib/delayed/performable_mailer.rb @@ -3,7 +3,7 @@ module Delayed class PerformableMailer < PerformableMethod def perform - object.send(delayed_method, *args).deliver + object.send(method, *args).deliver end end end diff --git a/lib/delayed/performable_method.rb b/lib/delayed/performable_method.rb index 3c67d1d13..bc5908921 100644 --- a/lib/delayed/performable_method.rb +++ b/lib/delayed/performable_method.rb @@ -1,19 +1,19 @@ module Delayed - class PerformableMethod < Struct.new(:object, :delayed_method, :args) - def initialize(object, delayed_method, args) - raise NoMethodError, "undefined method `#{delayed_method}' for #{object.inspect}" unless object.respond_to?(delayed_method, true) + class PerformableMethod < Struct.new(:object, :method, :args) + def initialize(object, method, args) + raise NoMethodError, "undefined method `#{method}' for #{object.inspect}" unless object.respond_to?(method, true) self.object = object self.args = args - self.delayed_method = delayed_method.to_sym + self.method = method.to_sym end def display_name - "#{object.class}##{delayed_method}" + "#{object.class}##{method}" end def perform - object.send(delayed_method, *args) if object + object.send(method, *args) if object end def method_missing(symbol, *args) From 7093c261dc5008072c8409e2b7b1283f23627d07 Mon Sep 17 00:00:00 2001 From: Matt Griffin Date: Thu, 8 Jul 2010 10:17:18 -0400 Subject: [PATCH 03/10] Changed @@sleep_delay to self.class.sleep_delay to be consistent with other class variable usage --- lib/delayed/worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index 450582e8b..67cb71563 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -80,7 +80,7 @@ def start break if $exit if count.zero? - sleep(@@sleep_delay) + sleep(self.class.sleep_delay) else say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last] end From 34fc406e44839cc1f6990d775f4a79ff7ebc5506 Mon Sep 17 00:00:00 2001 From: Matt Griffin Date: Fri, 30 Jul 2010 17:24:51 -0400 Subject: [PATCH 04/10] Add support for sleep_delay command line option --- lib/delayed/command.rb | 5 +++-- lib/delayed/worker.rb | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/delayed/command.rb b/lib/delayed/command.rb index b844ba7f9..5caa96385 100644 --- a/lib/delayed/command.rb +++ b/lib/delayed/command.rb @@ -44,8 +44,9 @@ def initialize(args) opts.on('-m', '--monitor', 'Start monitor process.') do @monitor = true end - - + opts.on('--sleep-delay N', "Amount of time to sleep when no jobs are found") do |n| + @options[:sleep_delay] = n + end end @args = opts.parse!(args) end diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index 67cb71563..a6960ce85 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -45,6 +45,7 @@ def initialize(options={}) @quiet = options.has_key?(:quiet) ? options[:quiet] : true self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority) self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority) + self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay) end # Every worker has a unique name which by default is the pid of the process. There are some From 1fea4b06336f3cd26896bd79be10d78fd6bec201 Mon Sep 17 00:00:00 2001 From: Matt Griffin Date: Thu, 23 Sep 2010 11:47:21 -0400 Subject: [PATCH 05/10] Reschedule jobs using a time provided by the handler, or fall back to default --- lib/delayed/backend/base.rb | 7 +++- lib/delayed/backend/shared_spec.rb | 14 ++++++-- lib/delayed/worker.rb | 2 +- spec/sample_jobs.rb | 6 ++++ spec/worker_spec.rb | 51 ++++++++++++++++++++++++++++++ 5 files changed, 75 insertions(+), 5 deletions(-) diff --git a/lib/delayed/backend/base.rb b/lib/delayed/backend/base.rb index 333cca201..c1acec7c1 100644 --- a/lib/delayed/backend/base.rb +++ b/lib/delayed/backend/base.rb @@ -96,12 +96,17 @@ def hook(name, *args) end end + def reschedule_at + payload_object.respond_to?(:reschedule_at) ? + payload_object.reschedule_at(self.class.db_time_now, attempts) : + self.class.db_time_now + (attempts ** 4) + 5 + end + protected def set_default_run_at self.run_at ||= self.class.db_time_now end - end end end diff --git a/lib/delayed/backend/shared_spec.rb b/lib/delayed/backend/shared_spec.rb index bd4ebf40d..0a7ad52ce 100644 --- a/lib/delayed/backend/shared_spec.rb +++ b/lib/delayed/backend/shared_spec.rb @@ -294,7 +294,7 @@ def create_job(opts = {}) @job.locked_at.should be_nil end end - + context "large handler" do before do text = "Lorem ipsum dolor sit amet. " * 1000 @@ -402,8 +402,8 @@ def create_job(opts = {}) # reset defaults Delayed::Worker.destroy_failed_jobs = true Delayed::Worker.max_attempts = 25 - - @job = Delayed::Job.enqueue ErrorJob.new + + @job = Delayed::Job.enqueue(ErrorJob.new) end it "should record last_error when destroy_failed_jobs = false, max_attempts = 1" do @@ -425,6 +425,14 @@ def create_job(opts = {}) @job.run_at.should > Delayed::Job.db_time_now - 10.minutes @job.run_at.should < Delayed::Job.db_time_now + 10.minutes end + + it 'should re-schedule with handler provided time if present' do + @job = Delayed::Job.enqueue(CustomRescheduleJob.new(99.minutes)) + @worker.run(@job) + @job.reload + + (Delayed::Job.db_time_now + 99.minutes - @job.run_at).abs.should < 1 + end end context "reschedule" do diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index 450582e8b..eaee6609f 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -128,7 +128,7 @@ def run(job) # Uses an exponential scale depending on the number of failed attempts. def reschedule(job, time = nil) if (job.attempts += 1) < self.class.max_attempts - time ||= Job.db_time_now + (job.attempts ** 4) + 5 + time ||= job.reschedule_at job.run_at = time job.unlock job.save! diff --git a/spec/sample_jobs.rb b/spec/sample_jobs.rb index ba61d7f8c..6ffe11a02 100644 --- a/spec/sample_jobs.rb +++ b/spec/sample_jobs.rb @@ -14,6 +14,12 @@ class ErrorJob def perform; raise 'did not work'; end end +class CustomRescheduleJob < Struct.new(:offset) + cattr_accessor :runs; self.runs = 0 + def perform; raise 'did not work'; end + def reschedule_at(time, attempts); time + offset; end +end + class LongRunningJob def perform; sleep 250; end end diff --git a/spec/worker_spec.rb b/spec/worker_spec.rb index 62bc70edf..3d46e5580 100644 --- a/spec/worker_spec.rb +++ b/spec/worker_spec.rb @@ -34,4 +34,55 @@ lambda { Delayed::Worker.guess_backend }.should_not change { Delayed::Worker.backend } end end + + describe "running a job" do + before(:each) do + @worker = Delayed::Worker.new + end + + after(:each) do + Delayed::Job.delete_all + end + + describe 'that fails' do + before(:each) do + @handler = ErrorJob.new + @job = Delayed::Job.enqueue(@handler) + end + + it 'should increase the attempts' do + @worker.run(@job) + @job.attempts.should == 1 + end + + it 'should reschedule the job in the future' do + @worker.run(@job) + @job.run_at.should > Job.db_time_now + 5 + end + + describe 'with custom rescheduling strategy' do + before(:each) do + @reschedule_at = Time.current + 7.hours + @handler.stub!(:reschedule_at).and_return(@reschedule_at) + end + + it 'should invoke the strategy' do + @handler.should_receive(:reschedule_at) do |time, attempts| + (Job.db_time_now - time).should < 2 + attempts.should == 1 + + Job.db_time.now + 5 + end + + @worker.run(@job) + end + + end + + it 'should reschedule at the specified time' do + @worker.run(@job) + @job.run_at.should == @reschedule_at + end + end + end end From 92d7e31bc452e41076a07967627912c4872426b0 Mon Sep 17 00:00:00 2001 From: Matt Griffin Date: Thu, 23 Sep 2010 15:39:42 -0400 Subject: [PATCH 06/10] Remove broken spec that should not have been committed --- spec/worker_spec.rb | 51 --------------------------------------------- 1 file changed, 51 deletions(-) diff --git a/spec/worker_spec.rb b/spec/worker_spec.rb index 3d46e5580..62bc70edf 100644 --- a/spec/worker_spec.rb +++ b/spec/worker_spec.rb @@ -34,55 +34,4 @@ lambda { Delayed::Worker.guess_backend }.should_not change { Delayed::Worker.backend } end end - - describe "running a job" do - before(:each) do - @worker = Delayed::Worker.new - end - - after(:each) do - Delayed::Job.delete_all - end - - describe 'that fails' do - before(:each) do - @handler = ErrorJob.new - @job = Delayed::Job.enqueue(@handler) - end - - it 'should increase the attempts' do - @worker.run(@job) - @job.attempts.should == 1 - end - - it 'should reschedule the job in the future' do - @worker.run(@job) - @job.run_at.should > Job.db_time_now + 5 - end - - describe 'with custom rescheduling strategy' do - before(:each) do - @reschedule_at = Time.current + 7.hours - @handler.stub!(:reschedule_at).and_return(@reschedule_at) - end - - it 'should invoke the strategy' do - @handler.should_receive(:reschedule_at) do |time, attempts| - (Job.db_time_now - time).should < 2 - attempts.should == 1 - - Job.db_time.now + 5 - end - - @worker.run(@job) - end - - end - - it 'should reschedule at the specified time' do - @worker.run(@job) - @job.run_at.should == @reschedule_at - end - end - end end From dca7dd745a576e25dcb68caa9ea6ba949d2663ac Mon Sep 17 00:00:00 2001 From: Brandon Keepers Date: Fri, 17 Sep 2010 14:43:12 -0400 Subject: [PATCH 07/10] Use mysql to run specs --- Gemfile.lock | 2 ++ delayed_job.gemspec | 1 + lib/delayed/backend/active_record.rb | 4 ++++ spec/database.yml | 4 ++++ spec/spec_helper.rb | 5 ++++- 5 files changed, 15 insertions(+), 1 deletion(-) create mode 100644 spec/database.yml diff --git a/Gemfile.lock b/Gemfile.lock index 09bd7bcbe..3a8dfe678 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -49,6 +49,7 @@ GEM mime-types treetop (>= 1.4.5) mime-types (1.16) + mysql (2.8.1) polyglot (0.3.1) rack (1.2.1) rack-mount (0.6.13) @@ -88,6 +89,7 @@ DEPENDENCIES activesupport (~> 3.0) daemons delayed_job! + mysql rails (~> 3.0) rake rspec diff --git a/delayed_job.gemspec b/delayed_job.gemspec index adfa8ea85..f86429c72 100644 --- a/delayed_job.gemspec +++ b/delayed_job.gemspec @@ -24,5 +24,6 @@ This gem is collectiveidea's fork (http://github.com/collectiveidea/delayed_job) s.add_development_dependency 'rails', '~>3.0' s.add_development_dependency 'sqlite3-ruby' s.add_development_dependency 'ruby-debug' + s.add_development_dependency 'mysql' end diff --git a/lib/delayed/backend/active_record.rb b/lib/delayed/backend/active_record.rb index 5459dd5f5..ee54ba6db 100644 --- a/lib/delayed/backend/active_record.rb +++ b/lib/delayed/backend/active_record.rb @@ -30,6 +30,10 @@ class Job < ::ActiveRecord::Base } scope :by_priority, order('priority ASC, run_at ASC') + def self.before_fork + ::ActiveRecord::Base.clear_all_connections! + end + def self.after_fork ::ActiveRecord::Base.establish_connection end diff --git a/spec/database.yml b/spec/database.yml new file mode 100644 index 000000000..2b1ced79e --- /dev/null +++ b/spec/database.yml @@ -0,0 +1,4 @@ +mysql: + adapter: mysql + database: delayed_job + username: root diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 6160810b8..4ae589a9e 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -13,8 +13,11 @@ Delayed::Worker.logger = Logger.new('/tmp/dj.log') ENV['RAILS_ENV'] = 'test' +require 'rails' -ActiveRecord::Base.establish_connection(:adapter => 'sqlite3', :database => ':memory:') +config = YAML.load(File.read('spec/database.yml')) +ActiveRecord::Base.configurations = {'test' => config['mysql']} +ActiveRecord::Base.establish_connection ActiveRecord::Base.logger = Delayed::Worker.logger ActiveRecord::Migration.verbose = false From 4add3bad63857bc59e209cbbac2a5a48db4172e4 Mon Sep 17 00:00:00 2001 From: Brandon Keepers Date: Fri, 17 Sep 2010 14:52:29 -0400 Subject: [PATCH 08/10] Update benchmark to just run AR backend --- benchmarks.rb | 32 ++++++-------------------------- 1 file changed, 6 insertions(+), 26 deletions(-) diff --git a/benchmarks.rb b/benchmarks.rb index ccaa7e45e..c90a40a35 100644 --- a/benchmarks.rb +++ b/benchmarks.rb @@ -1,33 +1,13 @@ -$:.unshift(File.dirname(__FILE__) + '/lib') -require 'rubygems' +require 'spec/spec_helper' require 'logger' -require 'delayed_job' require 'benchmark' -RAILS_ENV = 'test' - -Delayed::Worker.logger = Logger.new('/dev/null') - -BACKENDS = [] -Dir.glob("#{File.dirname(__FILE__)}/spec/setup/*.rb") do |backend| - begin - backend = File.basename(backend, '.rb') - require "spec/setup/#{backend}" - BACKENDS << backend.to_sym - rescue LoadError - puts "Unable to load #{backend} backend! #{$!}" - end -end - +# Delayed::Worker.logger = Logger.new('/dev/null') Benchmark.bm(10) do |x| - BACKENDS.each do |backend| - require "spec/setup/#{backend}" - Delayed::Worker.backend = backend - - n = 10000 - n.times { "foo".delay.length } + Delayed::Job.delete_all + n = 10000 + n.times { "foo".delay.length } - x.report(backend.to_s) { Delayed::Worker.new(:quiet => true).work_off(n) } - end + x.report { Delayed::Worker.new(:quiet => true).work_off(n) } end From 4784f9dde8350a9b097f262fcb8536ea399d69fc Mon Sep 17 00:00:00 2001 From: David Genord II Date: Sun, 26 Sep 2010 01:37:12 -0400 Subject: [PATCH 09/10] Added options to handle_asynchronously --- README.textile | 33 +++++++++++++++++++++++++++++ lib/delayed/message_sending.rb | 14 +++++++++++-- spec/message_sending_spec.rb | 38 ++++++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 2 deletions(-) diff --git a/README.textile b/README.textile index 87c260721..e41ca5e29 100644 --- a/README.textile +++ b/README.textile @@ -84,6 +84,39 @@ device = Device.new device.deliver +handle_asynchronously can take as options anything you can pass to delay. In addition the values can be Proc objects allowing call time evaluation of the value. For some examples: + +
+class LongTasks
+  def send_mailer
+    # Some other code
+  end
+  handle_asynchronously :send_mailer, :priority => 20
+
+  def in_the_future
+    # Some other code
+  end
+  # 5.minutes.from_now will be evaluated when in_the_future is called
+  handle_asynchronously :in_the_future, :run_at => Proc.new { 5.minutes.from_now }
+
+  def self.when_to_run
+    2.hours.from_now
+  end
+
+  def call_a_class_method
+    # Some other code
+  end
+  handle_asynchronously :call_a_class_method, :run_at => Proc.new { when_to_run }
+
+  attr_reader :how_important
+
+  def call_an_instance_method
+    # Some other code
+  end
+  handle_asynchronously :call_an_instance_method, :priority => Proc.new {|i| i.how_important }
+end
+
+ h2. Running Jobs @script/delayed_job@ can be used to manage a background process which will start working off jobs. Make sure you've run `script/generate delayed_job`. diff --git a/lib/delayed/message_sending.rb b/lib/delayed/message_sending.rb index 8f7af9ce7..b93bcfce8 100644 --- a/lib/delayed/message_sending.rb +++ b/lib/delayed/message_sending.rb @@ -31,11 +31,21 @@ def send_at(time, method, *args) end module ClassMethods - def handle_asynchronously(method) + def handle_asynchronously(method, opts = {}) aliased_method, punctuation = method.to_s.sub(/([?!=])$/, ''), $1 with_method, without_method = "#{aliased_method}_with_delay#{punctuation}", "#{aliased_method}_without_delay#{punctuation}" define_method(with_method) do |*args| - delay.__send__(without_method, *args) + curr_opts = opts.clone + curr_opts.each_key do |key| + if (val = curr_opts[key]).is_a?(Proc) + curr_opts[key] = if val.arity == 1 + val.call(self) + else + val.call + end + end + end + delay(curr_opts).__send__(without_method, *args) end alias_method_chain method, :delay end diff --git a/spec/message_sending_spec.rb b/spec/message_sending_spec.rb index b7bfc445b..7b7f11040 100644 --- a/spec/message_sending_spec.rb +++ b/spec/message_sending_spec.rb @@ -22,6 +22,44 @@ def tell!(arg) job.payload_object.args.should == [1] }.should change { Delayed::Job.count } end + + describe 'with options' do + class Fable + class << self + attr_accessor :importance + end + def tell + end + handle_asynchronously :tell, :priority => Proc.new { self.importance } + end + + it 'should set the priority based on the Fable importance' do + Fable.importance = 10 + job = Fable.new.tell + job.priority.should == 10 + + Fable.importance = 20 + job = Fable.new.tell + job.priority.should == 20 + end + + describe 'using a proc with parament' do + class Yarn + attr_accessor :importance + def spin + end + handle_asynchronously :spin, :priority => Proc.new {|y| y.importance } + end + + it 'should set the priority based on the Fable importance' do + job = Yarn.new.tap {|y| y.importance = 10 }.spin + job.priority.should == 10 + + job = Yarn.new.tap {|y| y.importance = 20 }.spin + job.priority.should == 20 + end + end + end end context "delay" do From 15678f4530deb48523245c55cbdb5370e5999007 Mon Sep 17 00:00:00 2001 From: guns Date: Mon, 4 Oct 2010 02:50:06 -0500 Subject: [PATCH 10/10] reset :EXIT trap for children; dying children were incorrectly removing master pid file --- lib/delayed/daemon_tasks.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/delayed/daemon_tasks.rb b/lib/delayed/daemon_tasks.rb index b960a4946..728932698 100644 --- a/lib/delayed/daemon_tasks.rb +++ b/lib/delayed/daemon_tasks.rb @@ -38,7 +38,7 @@ $0 = "delayed_worker.#{id}" # reset all inherited traps from main process - [:CLD, :HUP, :TERM].each { |sig| trap sig, 'DEFAULT' } + [:CLD, :HUP, :TERM, :EXIT].each { |sig| trap sig, 'DEFAULT' } # lay quiet for a while before booting up if specified sleep delay if delay