Skip to content

Commit

Permalink
clean up ack support; acks are no longer sent automatically by the li…
Browse files Browse the repository at this point in the history
…brary
  • Loading branch information
tmm1 committed Jan 10, 2009
1 parent a2ef015 commit d368f67
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 45 deletions.
13 changes: 11 additions & 2 deletions examples/mq/simple-ack.rb
Expand Up @@ -13,13 +13,19 @@
MQ.queue('awesome').publish('Totally rad 3')

i = 0

# Stopping after the second item was acked will keep the 3rd item in the queue
MQ.queue('awesome').subscribe(:ack => true) do |h,m|
if i == 2
if (i+=1) == 3
puts 'Shutting down...'
AMQP.stop{ EM.stop }
end

if AMQP.closing?
puts "#{m} (ignored, redelivered later)"
else
puts m
i += 1
h.ack
end
end
end
Expand All @@ -29,9 +35,12 @@
Totally rad 1
Totally rad 2
Shutting down...
Totally rad 3 (ignored, redelivered later)

When restarted:

Totally rad 3
Totally rad 1
Shutting down...
Totally rad 2 (ignored, redelivered later)
Totally rad 3 (ignored, redelivered later)
3 changes: 2 additions & 1 deletion lib/amqp.rb
Expand Up @@ -15,6 +15,7 @@ class << self
@logging = false
attr_accessor :logging
attr_reader :conn, :closing
alias :closing? :closing
alias :connection :conn
end

Expand Down Expand Up @@ -89,7 +90,7 @@ class << self
end

def self.stop
if @conn
if @conn and not @closing
@closing = true
@conn.close{
yield if block_given?
Expand Down
12 changes: 10 additions & 2 deletions lib/mq/header.rb
Expand Up @@ -9,9 +9,17 @@ def initialize(mq, header_obj)

# Acknowledges the receipt of this message with the server.
def ack
@mq.callback do
@mq.callback{
@mq.send Protocol::Basic::Ack.new(:delivery_tag => properties[:delivery_tag])
end
}
end

# Reject this message (XXX currently unimplemented in rabbitmq)
# * :requeue => true | false (default false)
def reject opts = {}
@mq.callback{
@mq.send Protocol::Basic::Reject.new(opts.merge(:delivery_tag => properties[:delivery_tag]))
}
end

def method_missing meth, *args, &blk
Expand Down
48 changes: 8 additions & 40 deletions lib/mq/queue.rb
Expand Up @@ -216,8 +216,8 @@ def delete opts = {}
# end
#
# == Options
# * :no_ack => true | false (default true)
# If this field is set the server does not expect acknowledgments
# * :ack => true | false (default false)
# If this field is set to false the server does not expect acknowledgments
# for messages. That is, when a message is delivered to the client
# the server automatically and silently acknowledges it on behalf
# of the client. This functionality increases performance but at
Expand All @@ -230,8 +230,6 @@ def delete opts = {}
# method it will raise a channel or connection exception.
#
def pop opts = {}, &blk
@ack = generate_ack?(opts)

if blk
@on_pop = blk
@on_pop_opts = opts
Expand All @@ -240,7 +238,7 @@ def pop opts = {}, &blk
@mq.callback{
@mq.send Protocol::Basic::Get.new({ :queue => name,
:consumer_tag => name,
:no_ack => no_ack?(opts),
:no_ack => !opts.delete(:ack),
:nowait => true }.merge(opts))
@mq.get_queue{ |q|
q.push(self)
Expand Down Expand Up @@ -285,8 +283,8 @@ def pop opts = {}, &blk
# end
#
# == Options
# * :no_ack => true | false (default true)
# If this field is set the server does not expect acknowledgments
# * :ack => true | false (default false)
# If this field is set to false the server does not expect acknowledgments
# for messages. That is, when a message is delivered to the client
# the server automatically and silently acknowledges it on behalf
# of the client. This functionality increases performance but at
Expand All @@ -306,12 +304,11 @@ def subscribe opts = {}, &blk

@on_msg = blk
@on_msg_opts = opts
@ack = generate_ack?(opts)

@mq.callback{
@mq.send Protocol::Basic::Consume.new({ :queue => name,
:consumer_tag => @consumer_tag,
:no_ack => no_ack?(opts),
:no_ack => !opts.delete(:ack),
:nowait => true }.merge(opts))
}
self
Expand Down Expand Up @@ -372,26 +369,10 @@ def subscribed?
# the headers parameter. See #pop or #subscribe for a code example.
#
def receive headers, body
# XXX why is this here?
if AMQP.closing
#You don't need this if your using ack, and if you aren't it doesn't do much good either
#@mq.callback{
# @mq.send Protocol::Basic::Reject.new({
# :delivery_tag => headers.properties[:delivery_tag],
# :requeue => true
# })
#}
return
end
headers = MQ::Header.new(@mq, headers)

if cb = (@on_msg || @on_pop)
cb.call *(cb.arity == 1 ? [body] : [MQ::Header.new(@mq, headers), body])
end

if @ack && headers && !AMQP.closing
@mq.callback{
@mq.send Protocol::Basic::Ack.new({ :delivery_tag => headers.properties[:delivery_tag] })
}
cb.call *(cb.arity == 1 ? [body] : [headers, body])
end
end

Expand Down Expand Up @@ -442,18 +423,5 @@ def reset
def exchange
@exchange ||= Exchange.new(@mq, :direct, '', :key => name)
end

# Returns true if the options specified indicate that the AMQP
# library should autogenerate an Ack response after processing.
def generate_ack?(options)
options[:no_ack] === false && !options[:ack]
end

# Returns true if the options specified indicate that our
# request to the AMQP server should indicate that no Ack is required
# after delivering. (ie. no_ack == true)
def no_ack?(options)
!options[:ack]
end
end
end

0 comments on commit d368f67

Please sign in to comment.