Skip to content

Commit

Permalink
use header.reject instead of queue.recover in the subscriver logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Stefan Kaes committed Dec 10, 2010
1 parent 82628d2 commit 97ffdac
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 19 deletions.
7 changes: 7 additions & 0 deletions RELEASE_NOTES.rdoc
@@ -1,6 +1,13 @@
= Release Notes

== Version 0.2.9.8

* since version 2.0, RabbitMQ supports Basic.reject(:requeue => true). we use it now too,
because it enhances performance of message processors


== Version 0.2.9.7

* use new bunny_ext gem and allow specification of global publishing timeouts
* registering a message now automatically registers the corresponding exchange
* don't try to bind queues for an exchange hich has no queue
Expand Down
2 changes: 1 addition & 1 deletion beetle.gemspec
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = "beetle"
s.version = "0.2.9.7"
s.version = "0.2.9.8"

s.required_rubygems_version = ">= 1.3.1"
s.authors = ["Stefan Kaes", "Pascal Friederich", "Ali Jelveh", "Sebastian Roebke"]
Expand Down
16 changes: 8 additions & 8 deletions lib/beetle/r_c.rb
Expand Up @@ -4,7 +4,7 @@ module RC #:nodoc:all
# message processing result return codes
class ReturnCode
def initialize(*args)
@recover = args.delete :recover
@reject = args.delete :reject
@failure = args.delete :failure
@name = args.first
end
Expand All @@ -13,8 +13,8 @@ def inspect
@name.blank? ? super : "Beetle::RC::#{@name}"
end

def recover?
@recover
def reject?
@reject
end

def failure?
Expand All @@ -30,11 +30,11 @@ def self.rc(name, *args)
rc :Ancient
rc :AttemptsLimitReached, :failure
rc :ExceptionsLimitReached, :failure
rc :Delayed, :recover
rc :HandlerCrash, :recover
rc :HandlerNotYetTimedOut, :recover
rc :MutexLocked, :recover
rc :InternalError, :recover
rc :Delayed, :reject
rc :HandlerCrash, :reject
rc :HandlerNotYetTimedOut, :reject
rc :MutexLocked, :reject
rc :InternalError, :reject
rc :DecodingError, :failure

end
Expand Down
4 changes: 2 additions & 2 deletions lib/beetle/subscriber.rb
Expand Up @@ -107,9 +107,9 @@ def create_subscription_callback(queue_name, amqp_queue_name, handler, opts)
message_options = opts.merge(:server => server, :store => @client.deduplication_store)
m = Message.new(amqp_queue_name, header, data, message_options)
result = m.process(processor)
if result.recover?
if result.reject?
sleep 1
mq(server).recover
header.reject(:requeue => true)
elsif reply_to = header.properties[:reply_to]
# require 'ruby-debug'
# Debugger.start
Expand Down
6 changes: 3 additions & 3 deletions test/beetle/message_test.rb
Expand Up @@ -543,7 +543,7 @@ def setup
handler.expects(:process_failure).never
result = message.process(handler)
assert_equal RC::HandlerCrash, result
assert result.recover?
assert result.reject?
assert !result.failure?
end

Expand All @@ -561,7 +561,7 @@ def setup
failback.expects(:call).once
result = message.process(handler)
assert_equal RC::AttemptsLimitReached, result
assert !result.recover?
assert !result.reject?
assert result.failure?
end

Expand All @@ -578,7 +578,7 @@ def setup
failback.expects(:call).once
result = message.process(handler)
assert_equal RC::ExceptionsLimitReached, result
assert !result.recover?
assert !result.reject?
assert result.failure?
end

Expand Down
8 changes: 3 additions & 5 deletions test/beetle/subscriber_test.rb
Expand Up @@ -169,15 +169,13 @@ def setup
assert_nothing_raised { @callback.call(header, 'foo') }
end

test "should call recover on the server when processing the handler returns true on recover?" do
test "should call reject on the message header when processing the handler returns true on recover?" do
header = header_with_params({})
result = mock("result")
result.expects(:recover?).returns(true)
result.expects(:reject?).returns(true)
Message.any_instance.expects(:process).returns(result)
@sub.expects(:sleep).with(1)
mq = mock("MQ")
mq.expects(:recover)
@sub.expects(:mq).with(@sub.server).returns(mq)
header.expects(:reject).with(:requeue => true)
@callback.call(header, 'foo')
end

Expand Down

0 comments on commit 97ffdac

Please sign in to comment.