Skip to content

Commit

Permalink
Merge remote branch 'collectiveidea/master' into guns
Browse files Browse the repository at this point in the history
  • Loading branch information
guns committed Oct 4, 2010
2 parents 3409b0d + aba9905 commit e046e91
Show file tree
Hide file tree
Showing 14 changed files with 133 additions and 37 deletions.
2 changes: 2 additions & 0 deletions Gemfile.lock
Expand Up @@ -49,6 +49,7 @@ GEM
mime-types
treetop (>= 1.4.5)
mime-types (1.16)
mysql (2.8.1)
polyglot (0.3.1)
rack (1.2.1)
rack-mount (0.6.13)
Expand Down Expand Up @@ -88,6 +89,7 @@ DEPENDENCIES
activesupport (~> 3.0)
daemons
delayed_job!
mysql
rails (~> 3.0)
rake
rspec
Expand Down
33 changes: 33 additions & 0 deletions README.textile
Expand Up @@ -84,6 +84,39 @@ device = Device.new
device.deliver
</pre>

handle_asynchronously can take as options anything you can pass to delay. In addition the values can be Proc objects allowing call time evaluation of the value. For some examples:

<pre>
class LongTasks
def send_mailer
# Some other code
end
handle_asynchronously :send_mailer, :priority => 20

def in_the_future
# Some other code
end
# 5.minutes.from_now will be evaluated when in_the_future is called
handle_asynchronously :in_the_future, :run_at => Proc.new { 5.minutes.from_now }

def self.when_to_run
2.hours.from_now
end

def call_a_class_method
# Some other code
end
handle_asynchronously :call_a_class_method, :run_at => Proc.new { when_to_run }

attr_reader :how_important

def call_an_instance_method
# Some other code
end
handle_asynchronously :call_an_instance_method, :priority => Proc.new {|i| i.how_important }
end
</pre>

h2. Running Jobs

@script/delayed_job@ can be used to manage a background process which will start working off jobs. Make sure you've run `script/generate delayed_job`.
Expand Down
32 changes: 6 additions & 26 deletions benchmarks.rb
@@ -1,33 +1,13 @@
$:.unshift(File.dirname(__FILE__) + '/lib')
require 'rubygems'
require 'spec/spec_helper'
require 'logger'
require 'delayed_job'
require 'benchmark'

RAILS_ENV = 'test'

Delayed::Worker.logger = Logger.new('/dev/null')

BACKENDS = []
Dir.glob("#{File.dirname(__FILE__)}/spec/setup/*.rb") do |backend|
begin
backend = File.basename(backend, '.rb')
require "spec/setup/#{backend}"
BACKENDS << backend.to_sym
rescue LoadError
puts "Unable to load #{backend} backend! #{$!}"
end
end

# Delayed::Worker.logger = Logger.new('/dev/null')

Benchmark.bm(10) do |x|
BACKENDS.each do |backend|
require "spec/setup/#{backend}"
Delayed::Worker.backend = backend

n = 10000
n.times { "foo".delay.length }
Delayed::Job.delete_all
n = 10000
n.times { "foo".delay.length }

x.report(backend.to_s) { Delayed::Worker.new(:quiet => true).work_off(n) }
end
x.report { Delayed::Worker.new(:quiet => true).work_off(n) }
end
1 change: 1 addition & 0 deletions delayed_job.gemspec
Expand Up @@ -24,5 +24,6 @@ This gem is collectiveidea's fork (http://github.com/collectiveidea/delayed_job)
s.add_development_dependency 'rails', '~>3.0'
s.add_development_dependency 'sqlite3-ruby'
s.add_development_dependency 'ruby-debug'
s.add_development_dependency 'mysql'
end

4 changes: 4 additions & 0 deletions lib/delayed/backend/active_record.rb
Expand Up @@ -30,6 +30,10 @@ class Job < ::ActiveRecord::Base
}
scope :by_priority, order('priority ASC, run_at ASC')

def self.before_fork
::ActiveRecord::Base.clear_all_connections!
end

def self.after_fork
::ActiveRecord::Base.establish_connection
end
Expand Down
7 changes: 6 additions & 1 deletion lib/delayed/backend/base.rb
Expand Up @@ -96,12 +96,17 @@ def hook(name, *args)
end
end

def reschedule_at
payload_object.respond_to?(:reschedule_at) ?
payload_object.reschedule_at(self.class.db_time_now, attempts) :
self.class.db_time_now + (attempts ** 4) + 5
end

protected

def set_default_run_at
self.run_at ||= self.class.db_time_now
end

end
end
end
14 changes: 11 additions & 3 deletions lib/delayed/backend/shared_spec.rb
Expand Up @@ -294,7 +294,7 @@ def create_job(opts = {})
@job.locked_at.should be_nil
end
end

context "large handler" do
before do
text = "Lorem ipsum dolor sit amet. " * 1000
Expand Down Expand Up @@ -402,8 +402,8 @@ def create_job(opts = {})
# reset defaults
Delayed::Worker.destroy_failed_jobs = true
Delayed::Worker.max_attempts = 25

@job = Delayed::Job.enqueue ErrorJob.new
@job = Delayed::Job.enqueue(ErrorJob.new)
end

it "should record last_error when destroy_failed_jobs = false, max_attempts = 1" do
Expand All @@ -425,6 +425,14 @@ def create_job(opts = {})
@job.run_at.should > Delayed::Job.db_time_now - 10.minutes
@job.run_at.should < Delayed::Job.db_time_now + 10.minutes
end

it 'should re-schedule with handler provided time if present' do
@job = Delayed::Job.enqueue(CustomRescheduleJob.new(99.minutes))
@worker.run(@job)
@job.reload

(Delayed::Job.db_time_now + 99.minutes - @job.run_at).abs.should < 1
end
end

context "reschedule" do
Expand Down
5 changes: 3 additions & 2 deletions lib/delayed/command.rb
Expand Up @@ -44,8 +44,9 @@ def initialize(args)
opts.on('-m', '--monitor', 'Start monitor process.') do
@monitor = true
end


opts.on('--sleep-delay N', "Amount of time to sleep when no jobs are found") do |n|
@options[:sleep_delay] = n
end
end
@args = opts.parse!(args)
end
Expand Down
14 changes: 12 additions & 2 deletions lib/delayed/message_sending.rb
Expand Up @@ -31,11 +31,21 @@ def send_at(time, method, *args)
end

module ClassMethods
def handle_asynchronously(method)
def handle_asynchronously(method, opts = {})
aliased_method, punctuation = method.to_s.sub(/([?!=])$/, ''), $1
with_method, without_method = "#{aliased_method}_with_delay#{punctuation}", "#{aliased_method}_without_delay#{punctuation}"
define_method(with_method) do |*args|
delay.__send__(without_method, *args)
curr_opts = opts.clone
curr_opts.each_key do |key|
if (val = curr_opts[key]).is_a?(Proc)
curr_opts[key] = if val.arity == 1
val.call(self)
else
val.call
end
end
end
delay(curr_opts).__send__(without_method, *args)
end
alias_method_chain method, :delay
end
Expand Down
5 changes: 3 additions & 2 deletions lib/delayed/worker.rb
Expand Up @@ -45,6 +45,7 @@ def initialize(options={})
@quiet = options.has_key?(:quiet) ? options[:quiet] : true
self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay)
end

# Every worker has a unique name which by default is the pid of the process. There are some
Expand Down Expand Up @@ -80,7 +81,7 @@ def start
break if $exit

if count.zero?
sleep(@@sleep_delay)
sleep(self.class.sleep_delay)
else
say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
end
Expand Down Expand Up @@ -128,7 +129,7 @@ def run(job)
# Uses an exponential scale depending on the number of failed attempts.
def reschedule(job, time = nil)
if (job.attempts += 1) < self.class.max_attempts
time ||= Job.db_time_now + (job.attempts ** 4) + 5
time ||= job.reschedule_at
job.run_at = time
job.unlock
job.save!
Expand Down
4 changes: 4 additions & 0 deletions spec/database.yml
@@ -0,0 +1,4 @@
mysql:
adapter: mysql
database: delayed_job
username: root
38 changes: 38 additions & 0 deletions spec/message_sending_spec.rb
Expand Up @@ -22,6 +22,44 @@ def tell!(arg)
job.payload_object.args.should == [1]
}.should change { Delayed::Job.count }
end

describe 'with options' do
class Fable
class << self
attr_accessor :importance
end
def tell
end
handle_asynchronously :tell, :priority => Proc.new { self.importance }
end

it 'should set the priority based on the Fable importance' do
Fable.importance = 10
job = Fable.new.tell
job.priority.should == 10

Fable.importance = 20
job = Fable.new.tell
job.priority.should == 20
end

describe 'using a proc with parament' do
class Yarn
attr_accessor :importance
def spin
end
handle_asynchronously :spin, :priority => Proc.new {|y| y.importance }
end

it 'should set the priority based on the Fable importance' do
job = Yarn.new.tap {|y| y.importance = 10 }.spin
job.priority.should == 10

job = Yarn.new.tap {|y| y.importance = 20 }.spin
job.priority.should == 20
end
end
end
end

context "delay" do
Expand Down
6 changes: 6 additions & 0 deletions spec/sample_jobs.rb
Expand Up @@ -14,6 +14,12 @@ class ErrorJob
def perform; raise 'did not work'; end
end

class CustomRescheduleJob < Struct.new(:offset)
cattr_accessor :runs; self.runs = 0
def perform; raise 'did not work'; end
def reschedule_at(time, attempts); time + offset; end
end

class LongRunningJob
def perform; sleep 250; end
end
Expand Down
5 changes: 4 additions & 1 deletion spec/spec_helper.rb
Expand Up @@ -13,8 +13,11 @@

Delayed::Worker.logger = Logger.new('/tmp/dj.log')
ENV['RAILS_ENV'] = 'test'
require 'rails'

ActiveRecord::Base.establish_connection(:adapter => 'sqlite3', :database => ':memory:')
config = YAML.load(File.read('spec/database.yml'))
ActiveRecord::Base.configurations = {'test' => config['mysql']}
ActiveRecord::Base.establish_connection
ActiveRecord::Base.logger = Delayed::Worker.logger
ActiveRecord::Migration.verbose = false

Expand Down

0 comments on commit e046e91

Please sign in to comment.