Browse files

remove excess whitespace

  • Loading branch information...
1 parent e9499f5 commit 0581928d6a38dc0835092045ce40b25435a5246a @technoweenie technoweenie committed Sep 1, 2008
Showing with 208 additions and 234 deletions.
  1. +78 −81 lib/delayed/job.rb
  2. +2 −2 lib/delayed/message_sending.rb
  3. +9 −9 lib/delayed/performable_method.rb
  4. +50 −59 spec/delayed_method_spec.rb
  5. +59 −73 spec/job_spec.rb
  6. +10 −10 spec/story_spec.rb
View
159 lib/delayed/job.rb
@@ -1,102 +1,99 @@
module Delayed
class DeserializationError < StandardError
- end
+ end
- class Job < ActiveRecord::Base
+ class Job < ActiveRecord::Base
set_table_name :delayed_jobs
cattr_accessor :worker_name
self.worker_name = "pid:#{Process.pid}"
-
-
+
NextTaskSQL = '`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?) OR (`locked_by` = ?)'
NextTaskOrder = 'priority DESC, run_at ASC'
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
class LockError < StandardError
- end
+ end
def self.clear_locks!
connection.execute "UPDATE #{table_name} SET `locked_by`=NULL, `locked_at`=NULL WHERE `locked_by`=#{quote_value(worker_name)}"
end
-
+
def payload_object
@payload_object ||= deserialize(self['handler'])
end
-
+
def payload_object=(object)
self['handler'] = object.to_yaml
end
-
- def reshedule(message, time = nil)
+
+ def reshedule(message, time = nil)
time ||= Job.db_time_now + (attempts ** 4).seconds + 5
-
+
self.attempts += 1
self.run_at = time
- self.last_error = message
+ self.last_error = message
self.unlock
save!
- end
-
-
+ end
+
def self.enqueue(object, priority = 0)
unless object.respond_to?(:perform)
- raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
+ raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end
-
- Job.create(:payload_object => object, :priority => priority)
- end
-
+
+ Job.create(:payload_object => object, :priority => priority)
+ end
+
def self.find_available(limit = 5)
time_now = db_time_now
- ActiveRecord::Base.silence do
+ ActiveRecord::Base.silence do
find(:all, :conditions => [NextTaskSQL, time_now, time_now, worker_name], :order => NextTaskOrder, :limit => limit)
end
end
-
+
# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
- def self.reserve(max_run_time = 4.hours)
-
- # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
- # this leads to a more even distribution of jobs across the worker processes
- find_available(5).each do |job|
+ def self.reserve(max_run_time = 4.hours)
+
+ # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
+ # This leads to a more even distribution of jobs across the worker processes
+ find_available(5).each do |job|
begin
job.lock_exclusively!(max_run_time, worker_name)
- yield job.payload_object
+ yield job.payload_object
job.destroy
- return job
+ return job
rescue LockError
# We did not get the lock, some other worker process must have
puts "failed to aquire exclusive lock for #{job.id}"
- rescue StandardError => e
- job.reshedule e.message
- return job
+ rescue StandardError => e
+ job.reshedule e.message
+ return job
end
end
nil
- end
+ end
# This method is used internally by reserve method to ensure exclusive access
# to the given job. It will rise a LockError if it cannot get this lock.
def lock_exclusively!(max_run_time, worker = worker_name)
- now = self.class.db_time_now
-
- affected_rows = if locked_by != worker
-
-
+ now = self.class.db_time_now
+
+ affected_rows = if locked_by != worker
+
# We don't own this job so we will update the locked_by name and the locked_at
connection.update(<<-end_sql, "#{self.class.name} Update to aquire exclusive lock")
UPDATE #{self.class.table_name}
SET `locked_at`=#{quote_value(now)}, `locked_by`=#{quote_value(worker)}
WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_at` IS NULL OR `locked_at` < #{quote_value(now + max_run_time)})
end_sql
- else
-
- # We alrady own this job, this may happen if the job queue crashes.
+ else
+
+ # We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
connection.update(<<-end_sql, "#{self.class.name} Update exclusive lock")
UPDATE #{self.class.table_name}
@@ -105,95 +102,95 @@ def lock_exclusively!(max_run_time, worker = worker_name)
end_sql
end
-
+
unless affected_rows == 1
raise LockError, "Attempted to aquire exclusive lock failed"
- end
-
+ end
+
self.locked_at = now
- self.locked_by = worker
- end
-
+ self.locked_by = worker
+ end
+
def unlock
self.locked_at = nil
self.locked_by = nil
end
-
+
def self.work_off(num = 100)
success, failure = 0, 0
-
+
num.times do
-
+
job = self.reserve do |j|
begin
- j.perform
+ j.perform
success += 1
- rescue
+ rescue
failure += 1
raise
end
end
-
+
break if job.nil?
- end
-
+ end
+
return [success, failure]
end
-
+
private
-
- def deserialize(source)
+
+ def deserialize(source)
attempt_to_load_file = true
-
- begin
- handler = YAML.load(source) rescue nil
- return handler if handler.respond_to?(:perform)
-
+
+ begin
+ handler = YAML.load(source) rescue nil
+ return handler if handler.respond_to?(:perform)
+
if handler.nil?
if source =~ ParseObjectFromYaml
-
+
# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load($1)
-
+
# If successful, retry the yaml.load
handler = YAML.load(source)
- return handler if handler.respond_to?(:perform)
+ return handler if handler.respond_to?(:perform)
end
end
-
+
if handler.is_a?(YAML::Object)
-
+
# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load(handler.class)
-
+
# If successful, retry the yaml.load
handler = YAML.load(source)
return handler if handler.respond_to?(:perform)
end
-
- raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'
-
+
+ raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'
+
rescue TypeError, LoadError, NameError => e
-
- raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."
+
+ raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."
end
end
-
+
def attempt_to_load(klass)
- klass.constantize
+ klass.constantize
end
def self.db_time_now
- (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
+ (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
end
-
- protected
-
+
+ protected
+
def before_save
self.run_at ||= self.class.db_time_now
- end
-
+ end
+
end
end
View
4 lib/delayed/message_sending.rb
@@ -1,7 +1,7 @@
module Delayed
module MessageSending
- def send_later(method, *args)
+ def send_later(method, *args)
Delayed::Job.enqueue Delayed::PerformableMethod.new(self, method.to_sym, args)
end
- end
+ end
end
View
18 lib/delayed/performable_method.rb
@@ -1,22 +1,22 @@
module Delayed
- class PerformableMethod < Struct.new(:object, :method, :args)
+ class PerformableMethod < Struct.new(:object, :method, :args)
AR_STRING_FORMAT = /^AR\:([A-Z]\w+)\:(\d+)$/
-
+
def initialize(object, method, args)
raise NoMethodError, "undefined method `#{method}' for #{self.inspect}" unless object.respond_to?(method)
-
+
self.object = dump(object)
self.args = args.map { |a| dump(a) }
self.method = method.to_sym
end
-
+
def perform
load(object).send(method, *args.map{|a| load(a)})
rescue ActiveRecord::RecordNotFound
# We cannot do anything about objects which were deleted in the meantime
true
- end
-
+ end
+
private
def load(arg)
@@ -25,14 +25,14 @@ def load(arg)
else arg
end
end
-
+
def dump(arg)
case arg
when ActiveRecord::Base then ar_to_string(arg)
else arg
- end
+ end
end
-
+
def ar_to_string(obj)
"AR:#{obj.class}:#{obj.id}"
end
View
109 spec/delayed_method_spec.rb
@@ -1,119 +1,110 @@
require File.dirname(__FILE__) + '/database'
-if not defined?(:ActiveRecord)
- module ActiveRecord
- class RecordNotFound < StandardError
- end
- end
-end
-
-
class SimpleJob
- cattr_accessor :runs; self.runs = 0
+ cattr_accessor :runs; self.runs = 0
def perform; @@runs += 1; end
end
class RandomRubyObject
def say_hello
'hello'
- end
-end
+ end
+end
class ErrorObject
-
+
def throw
- raise ActiveRecord::RecordNotFound, '...'
+ raise ActiveRecord::RecordNotFound, '...'
false
end
-
-end
+
+end
class StoryReader
-
+
def read(story)
- "Epilog: #{story.tell}"
+ "Epilog: #{story.tell}"
end
-
+
end
class StoryReader
-
+
def read(story)
- "Epilog: #{story.tell}"
+ "Epilog: #{story.tell}"
end
-
+
end
-
describe 'random ruby objects' do
-
+
before { reset_db }
it "should respond_to :send_later method" do
-
- RandomRubyObject.new.respond_to?(:send_later)
-
- end
-
+
+ RandomRubyObject.new.respond_to?(:send_later)
+
+ end
+
it "should raise a ArgumentError if send_later is called but the target method doesn't exist" do
lambda { RandomRubyObject.new.send_later(:method_that_deos_not_exist) }.should raise_error(NoMethodError)
end
-
- it "should add a new entry to the job table when send_later is called on it" do
+
+ it "should add a new entry to the job table when send_later is called on it" do
Delayed::Job.count.should == 0
-
+
RandomRubyObject.new.send_later(:to_s)
Delayed::Job.count.should == 1
end
-
+
it "should run get the original method executed when the job is performed" do
-
+
RandomRubyObject.new.send_later(:say_hello)
-
- Delayed::Job.count.should == 1
- end
+
+ Delayed::Job.count.should == 1
+ end
it "should ignore ActiveRecord::RecordNotFound errors because they are permanent" do
-
- ErrorObject.new.send_later(:throw)
- Delayed::Job.count.should == 1
-
+ ErrorObject.new.send_later(:throw)
+
+ Delayed::Job.count.should == 1
+
output = nil
-
+
Delayed::Job.reserve do |e|
output = e.perform
end
-
+
output.should == true
-
- end
-
- it "should store the object as string if its an active record" do
- story = Story.create :text => 'Once upon...'
- story.send_later(:tell)
-
+
+ end
+
+ it "should store the object as string if its an active record" do
+ story = Story.create :text => 'Once upon...'
+ story.send_later(:tell)
+
job = Delayed::Job.find(:first)
job.payload_object.class.should == Delayed::PerformableMethod
job.payload_object.object.should == 'AR:Story:1'
job.payload_object.method.should == :tell
- job.payload_object.args.should == []
+ job.payload_object.args.should == []
job.payload_object.perform.should == 'Once upon...'
- end
-
+ end
+
it "should store arguments as string if they an active record" do
-
- story = Story.create :text => 'Once upon...'
-
- reader = StoryReader.new
+
+ story = Story.create :text => 'Once upon...'
+
+ reader = StoryReader.new
reader.send_later(:read, story)
-
+
job = Delayed::Job.find(:first)
job.payload_object.class.should == Delayed::PerformableMethod
job.payload_object.method.should == :read
job.payload_object.args.should == ['AR:Story:1']
- job.payload_object.perform.should == 'Epilog: Once upon...'
+ job.payload_object.perform.should == 'Epilog: Once upon...'
end
-
+
end
View
132 spec/job_spec.rb
@@ -1,134 +1,120 @@
require File.dirname(__FILE__) + '/database'
class SimpleJob
- cattr_accessor :runs; self.runs = 0
+ cattr_accessor :runs; self.runs = 0
def perform; @@runs += 1; end
-end
+end
class ErrorJob
- cattr_accessor :runs; self.runs = 0
- def perform; raise 'did not work'; end
+ cattr_accessor :runs; self.runs = 0
+ def perform; raise 'did not work'; end
end
describe Delayed::Job do
-
- before :each do
+
+ before :each do
reset_db
- end
+ end
it "should set run_at automatically" do
Delayed::Job.create(:payload_object => ErrorJob.new ).run_at.should_not == nil
- end
+ end
it "should raise ArgumentError when handler doesn't respond_to :perform" do
lambda { Delayed::Job.enqueue(Object.new) }.should raise_error(ArgumentError)
end
-
+
it "should increase count after enqueuing items" do
- Delayed::Job.enqueue SimpleJob.new
+ Delayed::Job.enqueue SimpleJob.new
Delayed::Job.count.should == 1
end
-
- it "should call perform on jobs when running work_off" do
+
+ it "should call perform on jobs when running work_off" do
SimpleJob.runs.should == 0
-
- Delayed::Job.enqueue SimpleJob.new
+
+ Delayed::Job.enqueue SimpleJob.new
Delayed::Job.work_off
-
- SimpleJob.runs.should == 1
- end
-
- it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do
- Delayed::Job.enqueue ErrorJob.new
- runner = Delayed::Job.work_off(1)
+
+ SimpleJob.runs.should == 1
+ end
+
+ it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do
+ Delayed::Job.enqueue ErrorJob.new
+ runner = Delayed::Job.work_off(1)
job = Delayed::Job.find(:first)
job.last_error.should == 'did not work'
job.attempts.should == 1
- job.run_at.should > Time.now
- job.run_at.should < Time.now + 6.minutes
- end
-
+ job.run_at.should > Time.now
+ job.run_at.should < Time.now + 6.minutes
+ end
+
it "should raise an DeserializationError when the job class is totally unknown" do
job = Delayed::Job.new
job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}"
- lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
+ lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
end
it "should try to load the class when it is unknown at the time of the deserialization" do
- job = Delayed::Job.new
+ job = Delayed::Job.new
job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}"
job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true)
-
- lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
- end
-
+
+ lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
+ end
+
it "should try include the namespace when loading unknown objects" do
- job = Delayed::Job.new
+ job = Delayed::Job.new
job['handler'] = "--- !ruby/object:Delayed::JobThatDoesNotExist {}"
- job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
- lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
- end
-
-
+ job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
+ lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
+ end
+
it "should also try to load structs when they are unknown (raises TypeError)" do
- job = Delayed::Job.new
+ job = Delayed::Job.new
job['handler'] = "--- !ruby/struct:JobThatDoesNotExist {}"
job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true)
-
- lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
- end
-
+
+ lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
+ end
+
it "should try include the namespace when loading unknown structs" do
- job = Delayed::Job.new
+ job = Delayed::Job.new
job['handler'] = "--- !ruby/struct:Delayed::JobThatDoesNotExist {}"
- job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
- lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
- end
-
-
+ job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
+ lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
+ end
+
describe "when another worker is already performing an task, it" do
-
+
before :each do
Delayed::Job.worker_name = 'worker1'
@job = Delayed::Job.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => Time.now.utc
end
-
- it "should not allow a second worker to get exclusive access" do
- lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError)
- end
-
- it "should be able to get access to the task if it was started more then max_age ago" do
+
+ it "should not allow a second worker to get exclusive access" do
+ lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError)
+ end
+
+ it "should be able to get access to the task if it was started more then max_age ago" do
@job.locked_at = 5.hours.ago
@job.save
- @job.lock_exclusively! 4.hours, 'worker2'
+ @job.lock_exclusively! 4.hours, 'worker2'
@job.reload
@job.locked_by.should == 'worker2'
@job.locked_at.should > 1.minute.ago
end
- it "should be able to get exclusive access again when the worker name is the same" do
- @job.lock_exclusively! Time.now + 20, 'worker1'
+ it "should be able to get exclusive access again when the worker name is the same" do
+ @job.lock_exclusively! Time.now + 20, 'worker1'
@job.lock_exclusively! Time.now + 21, 'worker1'
- @job.lock_exclusively! Time.now + 22, 'worker1'
- end
+ @job.lock_exclusively! Time.now + 22, 'worker1'
+ end
end
-
-end
-
-
-
-
-
-
-
-
-
-
-
+end
View
20 spec/story_spec.rb
@@ -1,18 +1,18 @@
-require File.dirname(__FILE__) + '/database'
+require File.dirname(__FILE__) + '/database'
describe "A story" do
-
- before do
+
+ before do
reset_db
Story.create :text => "Once upon a time..."
end
-
+
it "should be shared" do
- Story.find(:first).tell.should == 'Once upon a time...'
- end
-
+ Story.find(:first).tell.should == 'Once upon a time...'
+ end
+
it "should not return its result if it storytelling is delayed" do
- Story.find(:first).send_later(:tell).should_not == 'Once upon a time...'
- end
-
+ Story.find(:first).send_later(:tell).should_not == 'Once upon a time...'
+ end
+
end

0 comments on commit 0581928

Please sign in to comment.