Permalink
Browse files

Fixed handling of unsubscribe race condition.

It was possible to cancel a subscription via timeout or cancellator
while in the process of receiving a message. This would cause the
unsubscribe to fail, and would leave unparsed junk in the buffer. This
could also cause the message to be dropped, which is bad.

The solution is to detect when the unsubscribe fails and parse the
new-found message like nothing happened. Unfortuntely, the CancelOk
response from the unsubscribe is still somewhere in our buffer and needs
to be processed or else it will break things. The logic is more
complicated as a result, but should be able to handle all possible
states of the buffer.
  • Loading branch information...
kixelated committed Mar 13, 2012
1 parent 0ce631b commit 04e71a4b3e059723fa1ccf48b6085e2b99107b6f
Showing with 67 additions and 16 deletions.
  1. +2 −0 lib/bunny/client.rb
  2. +7 −4 lib/bunny/queue.rb
  3. +1 −1 lib/bunny/subscription.rb
  4. +2 −1 lib/qrack/client.rb
  5. +55 −10 lib/qrack/subscription.rb
View
@@ -36,6 +36,8 @@ def initialize(connection_string_or_opts = Hash.new, opts = Hash.new)
# Checks response from AMQP methods and takes appropriate action
def check_response(received_method, expected_method, err_msg, err_class = Bunny::ProtocolError)
+ @last_method = received_method
+
case
when received_method.is_a?(Qrack::Protocol::Connection::Close)
# Clean up the socket
View
@@ -291,6 +291,9 @@ def unbind(exchange, opts = {})
# @option opts [Boolean] :nowait (false)
# Ignored by Bunny, always @false@.
#
+ # @option opts [Boolean] :nowait (false)
+ # Ignored by Bunny, always @false@.
+ #
# @return [Symbol] @:unsubscribe_ok@ if successful
def unsubscribe(opts = {})
# Default consumer_tag from subscription if not passed in
@@ -303,13 +306,13 @@ def unsubscribe(opts = {})
# Cancel consumer
client.send_frame(Qrack::Protocol::Basic::Cancel.new(:consumer_tag => consumer_tag, :nowait => false))
- method = client.next_method
-
- client.check_response(method, Qrack::Protocol::Basic::CancelOk, "Error unsubscribing from queue #{name}")
-
# Reset subscription
@default_consumer = nil
+ method = client.next_method
+
+ client.check_response(method, Qrack::Protocol::Basic::CancelOk, "Error unsubscribing from queue #{name}, got #{method.class}")
+
# Return confirmation
:unsubscribe_ok
end
@@ -82,7 +82,7 @@ def setup_consumer
method = client.next_method
- client.check_response(method, Qrack::Protocol::Basic::ConsumeOk, "Error subscribing to queue #{queue.name}")
+ client.check_response(method, Qrack::Protocol::Basic::ConsumeOk, "Error subscribing to queue #{queue.name}, got #{method}")
@consumer_tag = method.consumer_tag
end
View
@@ -14,7 +14,7 @@ class Client
CONNECT_TIMEOUT = 5.0
RETRY_DELAY = 10.0
- attr_reader :status, :host, :vhost, :port, :logging, :spec, :heartbeat
+ attr_reader :status, :host, :vhost, :port, :logging, :spec, :heartbeat, :last_method
attr_accessor :channel, :logfile, :exchanges, :queues, :channels, :message_in, :message_out, :connecting
@@ -51,6 +51,7 @@ def initialize(connection_string_or_opts = Hash.new, opts = Hash.new)
create_logger if @logging
@message_in = false
@message_out = false
+ @last_method = nil
@connecting = false
@channels ||= []
# Create channel 0
View
@@ -55,41 +55,86 @@ def start(&blk)
# Notify server about new consumer
setup_consumer
+ # We need to keep track of three possible subscription states
+ # :subscribed, :pending, and :unsubscribed
+ # 'pending' occurs because of network latency, where we tried to unsubscribe but were already given a message
+ subscribe_state = :subscribed
+
# Start subscription loop
loop do
begin
- method = client.next_method(:timeout => timeout,
- :cancellator => @cancellator)
+ method = client.next_method(:timeout => timeout, :cancellator => @cancellator)
rescue Qrack::FrameTimeout
- queue.unsubscribe
- break
+ begin
+ queue.unsubscribe
+ subscribe_state = :unsubscribed
+
+ break
+ rescue Bunny::ProtocolError
+ # Unsubscribe failed because we actually got a message, so we're in a weird state.
+ # We have to keep processing the message or else it may be lost...
+ # ...and there is also a CancelOk method floating around that we need to consume from the socket
+
+ method = client.last_method
+ subscribe_state = :pending
+ end
end
# Increment message counter
@message_count += 1
# get delivery tag to use for acknowledge
queue.delivery_tag = method.delivery_tag if @ack
-
header = client.next_payload
+ # The unsubscribe ok may be sprinked into the payload
+ if subscribe_state == :pending and header.is_a?(Qrack::Protocol::Basic::CancelOk)
+ # We popped off the CancelOk, so we don't have to keep looking for it
+ subscribe_state = :unsubscribed
+
+ # Get the actual header now
+ header = client.next_payload
+ end
+
# If maximum frame size is smaller than message payload body then message
# will have a message header and several message bodies
msg = ''
while msg.length < header.size
- msg << client.next_payload
+ message = client.next_payload
+
+ # The unsubscribe ok may be sprinked into the payload
+ if subscribe_state == :pending and message.is_a?(Qrack::Protocol::Basic::CancelOk)
+ # We popped off the CancelOk, so we don't have to keep looking for it
+ subscribe_state = :unsubscribed
+ next
+ end
+
+ msg << message
end
# If block present, pass the message info to the block for processing
blk.call({:header => header, :payload => msg, :delivery_details => method.arguments}) if !blk.nil?
- # Exit loop if message_max condition met
- if (!message_max.nil? and message_count == message_max)
- # Stop consuming messages
- queue.unsubscribe()
+ # Unsubscribe if we've encountered the maximum number of messages
+ if subscribe_state == :subscribed and !message_max.nil? and message_count == message_max
+ queue.unsubscribe
+ subscribe_state = :unsubscribed
+ end
+
+ # Exit the loop if we've unsubscribed
+ if subscribe_state != :subscribed
+ # We still haven't found the CancelOk, so it's the next method
+ if subscribe_state == :pending
+ method = client.next_method
+ client.check_response(method, Qrack::Protocol::Basic::CancelOk, "Error unsubscribing from queue #{queue.name}, got #{method.class}")
+
+ subscribe_state = :unsubscribed
+ end
+
# Acknowledge receipt of the final message
queue.ack() if @ack
+
# Quit the loop
break
end

0 comments on commit 04e71a4

Please sign in to comment.