Skip to content

Commit

Permalink
Add new argument (:auto_ack) to queue.subscribe method to allow more …
Browse files Browse the repository at this point in the history
…flexibility when acknowledging messages
  • Loading branch information
celldee committed Nov 16, 2012
1 parent 60b7e1b commit b4d62be
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 3 deletions.
5 changes: 5 additions & 0 deletions lib/bunny/subscription.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ module Bunny
# message from the client and will re-queue the message if it does not
# receive one within a time specified by the server.
#
# @option opts [Boolean] :auto_ack
# If set to @false@, the consumer does not send automatic acknowledgements
# to the server. If set to @true@, the consumer does send automatic acknowledgements
# to the server.
#
# @option opts [Boolean] :exclusive (false)
# Request exclusive consumer access, meaning only this consumer can access the queue.
#
Expand Down
9 changes: 6 additions & 3 deletions lib/qrack/subscription.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module Qrack
# @deprecated
class Subscription

attr_accessor :consumer_tag, :delivery_tag, :message_max, :timeout, :ack, :exclusive
attr_accessor :consumer_tag, :delivery_tag, :message_max, :timeout, :ack, :auto_ack, :exclusive
attr_reader :client, :queue, :message_count

def initialize(client, queue, opts = {})
Expand All @@ -32,6 +32,9 @@ def initialize(client, queue, opts = {})

# Do we want to have to provide an acknowledgement?
@ack = opts[:ack] || nil

# Should the consumer automatically ack messages?
@auto_ack = opts[:auto_ack].nil? ? @ack : opts[:auto_ack]

# Does this consumer want exclusive use of the queue?
@exclusive = opts[:exclusive] || false
Expand Down Expand Up @@ -133,7 +136,7 @@ def start(&blk)
end

# Acknowledge receipt of the final message
queue.ack() if @ack
queue.ack() if @auto_ack

# Quit the loop
break
Expand All @@ -143,7 +146,7 @@ def start(&blk)
# if you are using Client#qos prefetch and you will get extra messages sent through before
# the unsubscribe takes effect to stop messages being sent to this consumer unless the ack is
# deferred.
queue.ack() if @ack
queue.ack() if @auto_ack
end
end

Expand Down
15 changes: 15 additions & 0 deletions spec/spec_09/queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,21 @@ def message_count(queue, sleep_time = 0.1)
message_count(q).should == 5
q.purge.should == :purge_ok
end

it "should requeue unacked messages when acks are expected but not sent" do
q = @b.queue('test1')
@default_exchange.publish('hello', :key => 'test1')
@default_exchange.publish('world', :key => 'test1')
sleep 0.2
q.subscribe(:ack => true, :auto_ack => false, :timeout => 0.5)
@b.stop
sleep 0.1
@b.start
q = @b.queue('test1')
q.pop[:payload].should == "hello"
q.pop[:payload].should == "world"
q.purge.should == :purge_ok
end

it "should raise an error when delete fails" do
q = @b.queue('test1')
Expand Down

0 comments on commit b4d62be

Please sign in to comment.