Skip to content

Commit

Permalink
Refactor hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
bkeepers committed Sep 6, 2010
1 parent 129b612 commit a7751a8
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 65 deletions.
47 changes: 26 additions & 21 deletions lib/delayed/backend/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,34 @@ module Base
def self.included(base)
base.extend ClassMethods
end

module ClassMethods
# Add a job to the queue
def enqueue(*args)
object = args.shift
unless object.respond_to?(:perform)
raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end

priority = args.first || Delayed::Worker.default_priority
run_at = args[1]
self.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
end

# Hook method that is called before a new worker is forked
def before_fork
end

# Hook method that is called after a new worker is forked
def after_fork
end

def work_off(num = 100)
warn "[DEPRECATION] `Delayed::Job.work_off` is deprecated. Use `Delayed::Worker.new.work_off instead."
Delayed::Worker.new.work_off(num)
end
end

ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/

def failed?
Expand All @@ -52,40 +52,45 @@ def name
def payload_object=(object)
self.handler = object.to_yaml
end

def payload_object
@payload_object ||= YAML.load(self.handler)
rescue TypeError, LoadError, NameError => e
raise DeserializationError,
"Job failed to load: #{e.message}. Try to manually require the required file. Handler: #{handler.inspect}"
end

# Moved into its own method so that new_relic can trace it.
def invoke_job
payload_object.before(self) if payload_object.respond_to?(:before)
begin
payload_object.perform
payload_object.success(self) if payload_object.respond_to?(:success)
rescue Exception => e
payload_object.failure(self, e) if payload_object.respond_to?(:failure)
raise e
ensure
payload_object.after(self) if payload_object.respond_to?(:after)
end
hook :before
payload_object.perform
hook :success
rescue Exception => e
hook :error, e
raise e
ensure
hook :after
end

# Unlock this job (note: not saved to DB)
def unlock
self.locked_at = nil
self.locked_by = nil
end


def hook(name, *args)
if payload_object.respond_to?(name)
method = payload_object.method(name)
args.unshift(self)
method.call(*args.slice(0, method.arity))
end
end

protected

def set_default_run_at
self.run_at ||= self.class.db_time_now
end

end
end
end
41 changes: 24 additions & 17 deletions lib/delayed/backend/shared_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,29 @@ def create_job(opts = {})

describe "callbacks" do
before(:each) do
SuccessfulCallbackJob.messages = []
FailureCallbackJob.messages = []
CallbackJob.messages = []
end

it "should call before and after callbacks" do
job = described_class.enqueue(SuccessfulCallbackJob.new)
job = described_class.enqueue(CallbackJob.new)
job.invoke_job
SuccessfulCallbackJob.messages.should == ["before perform", "perform", "success!", "after perform"]
CallbackJob.messages.should == ["before", "perform", "success", "after"]
end

it "should call the after callback with an error" do
job = described_class.enqueue(FailureCallbackJob.new)
lambda {job.invoke_job}.should raise_error
FailureCallbackJob.messages.should == ["before perform", "error: RuntimeError", "after perform"]
job = described_class.enqueue(CallbackJob.new)
job.payload_object.should_receive(:perform).and_raise(RuntimeError.new("fail"))

lambda { job.invoke_job }.should raise_error
CallbackJob.messages.should == ["before", "error: RuntimeError", "after"]
end

it "should call error when before raises an error" do
job = described_class.enqueue(CallbackJob.new)
job.payload_object.should_receive(:before).and_raise(RuntimeError.new("fail"))
lambda { job.invoke_job }.should raise_error(RuntimeError)
CallbackJob.messages.should == ["error: RuntimeError", "after"]
end
end

describe "payload_object" do
Expand Down Expand Up @@ -394,33 +401,33 @@ def create_job(opts = {})
end

share_examples_for "any failure more than Worker.max_attempts times" do
context "when the job's payload has an #on_permanent_failure hook" do
context "when the job's payload has a #failure hook" do
before do
@job = Delayed::Job.create :payload_object => OnPermanentFailureJob.new
@job.payload_object.should respond_to :on_permanent_failure
@job.payload_object.should respond_to :failure
end

it "should run that hook" do
@job.payload_object.should_receive :on_permanent_failure
@job.payload_object.should_receive :failure
Delayed::Worker.max_attempts.times { @worker.reschedule(@job) }
end
end

context "when the job's payload has no #on_permanent_failure hook" do
context "when the job's payload has no #failure hook" do
# It's a little tricky to test this in a straightforward way,
# because putting a should_not_receive expectation on
# @job.payload_object.on_permanent_failure makes that object
# @job.payload_object.failure makes that object
# incorrectly return true to
# payload_object.respond_to? :on_permanent_failure, which is what
# reschedule uses to decide whether to call on_permanent_failure.
# payload_object.respond_to? :failure, which is what
# reschedule uses to decide whether to call failure.
# So instead, we just make sure that the payload_object as it
# already stands doesn't respond_to? on_permanent_failure, then
# already stands doesn't respond_to? failure, then
# shove it through the iterated reschedule loop and make sure we
# don't get a NoMethodError (caused by calling that nonexistent
# on_permanent_failure method).
# failure method).

before do
@job.payload_object.should_not respond_to(:on_permanent_failure)
@job.payload_object.should_not respond_to(:failure)
end

it "should not try to run that hook" do
Expand Down
13 changes: 3 additions & 10 deletions lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,10 @@ def reschedule(job, time = nil)
job.save!
else
say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO

if job.payload_object.respond_to? :on_permanent_failure
say "Running on_permanent_failure hook"
failure_method = job.payload_object.method(:on_permanent_failure)
if failure_method.arity == 1
failure_method.call(job)
else
failure_method.call
end
if job.respond_to?(:on_permanent_failure)
warn "[DEPRECATION] The #on_permanent_failure hook has been renamed to #failure."
end

job.hook(:failure)
self.class.destroy_failed_jobs ? job.destroy : job.update_attributes(:failed_at => Delayed::Job.db_time_now)
end
end
Expand Down
8 changes: 4 additions & 4 deletions spec/performable_method_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
@method.perform
end

it "should respond to on_permanent_failure when implemented and target object is called via object.delay.do_something" do
it "should respond to failure when implemented and target object is called via object.delay.do_something" do
@method = Delayed::PerformableMethod.new(OnPermanentFailureJob.new, :perform, [])
@method.respond_to?(:on_permanent_failure).should be_true
@method.object.should_receive(:on_permanent_failure)
@method.on_permanent_failure
@method.respond_to?(:failure).should be_true
@method.object.should_receive(:failure)
@method.failure
end
end

Expand Down
24 changes: 11 additions & 13 deletions spec/sample_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def perform; sleep 250; end
end

class OnPermanentFailureJob < SimpleJob
def on_permanent_failure
def failure
end
end

Expand All @@ -30,32 +30,30 @@ def perform; @@runs += 1; end
end
end

class SuccessfulCallbackJob
class CallbackJob
cattr_accessor :messages

def before(job)
SuccessfulCallbackJob.messages << 'before perform'
self.class.messages << 'before'
end

def perform
SuccessfulCallbackJob.messages << 'perform'
self.class.messages << 'perform'
end

def after(job, error = nil)
SuccessfulCallbackJob.messages << 'after perform'
self.class.messages << 'after'
end

def success(job)
SuccessfulCallbackJob.messages << 'success!'
self.class.messages << 'success'
end
def failure(job, error)
SuccessfulCallbackJob.messages << "error: #{error.class}"

def error(job, error)
self.class.messages << "error: #{error.class}"
end
end

class FailureCallbackJob < SuccessfulCallbackJob
def perform
raise "failure job"
def failure(job)
self.class.messages << 'failure'
end
end

0 comments on commit a7751a8

Please sign in to comment.