Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Rework publisher confirms extension to use callbacks

Previous implementation incorrectly assumed that methods in modules are
prepended to the inheritance hierarchy while they are, in fact, appended.

We use a non-standard event name (:after_publish) to avoid clashes. This
may change before 1.0.
  • Loading branch information...
commit 81b089d8749f4b48f7229b2b7e690e69f1003fc0 1 parent ada6981
Michael Klishin michaelklishin authored
2  lib/amq/client/async/exchange.rb
View
@@ -151,6 +151,8 @@ def publish(payload, routing_key = AMQ::Protocol::EMPTY_STRING, user_headers = {
headers = { :priority => 0, :delivery_mode => 2, :content_type => "application/octet-stream" }.merge(user_headers)
@connection.send_frameset(Protocol::Basic::Publish.encode(@channel.id, payload, headers, @name, routing_key, mandatory, immediate, (frame_size || @connection.frame_max)), @channel)
+ # publisher confirms support. MK.
+ @channel.exec_callback(:after_publish)
self
end
38 lib/amq/client/async/extensions/rabbitmq/confirm.rb
View
@@ -57,15 +57,15 @@ module ChannelMixin
attr_writer :publisher_index
# Publisher index is an index of the last message since
- # the confirmations were activated, started with 1. It's
- # incremented by 1 after each Basic.Publish starting at 1.
+ # the confirmations were activated, started with 0. It's
+ # incremented by 1 every time a message is published.
# This is done on both client and server, hence this
# acknowledged messages can be matched via its delivery-tag.
#
# @return [Integer] Current publisher index.
# @api public
def publisher_index
- @publisher_index ||= 1
+ @publisher_index ||= 0
end
# Resets publisher index to 0
@@ -81,8 +81,8 @@ def reset_publisher_index!
# can be actually matched.
#
# @api plugin
- def after_publish(*args)
- self.publisher_index += 1
+ def increment_publisher_index!
+ @publisher_index += 1
end
# Turn on confirmations for this channel and, if given,
@@ -104,7 +104,12 @@ def confirm_select(nowait = false, &block)
end
@uses_publisher_confirmations = true
+ reset_publisher_index!
+
self.redefine_callback(:confirm_select, &block) unless nowait
+ self.redefine_callback(:after_publish) do
+ increment_publisher_index!
+ end
@connection.send_frame(Protocol::Confirm::Select.encode(@id, nowait))
self
@@ -210,23 +215,6 @@ def self.included(host)
end
end # self.included(host)
end # ChannelMixin
-
-
- module ExchangeMixin
- # Publish message and then run #after_publish on channel belonging
- # to the exchange. This is used for incrementing the publisher index.
- #
- # @api public
- # @see AMQ::Client::Exchange#publish
- # @see AMQ::Client::Extensions::RabbitMQ::Channel#publisher_index
- # @return [self] self
- def publish(*args, &block)
- super(*args)
- @channel.after_publish(*args, &block)
-
- self
- end # publish
- end # ExchangeMixin
end # Confirm
end # RabbitMQ
end # Extensions
@@ -237,12 +225,6 @@ class Channel
# instead of reckless monkey-patching. MK.
include Extensions::RabbitMQ::Confirm::ChannelMixin
end # Channel
-
- class Exchange
- # use modules, a native Ruby way of extension of existing classes,
- # instead of reckless monkey-patching. MK.
- include Extensions::RabbitMQ::Confirm::ExchangeMixin
- end # Exchange
end # Async
end # Client
end # AMQ
Please sign in to comment.
Something went wrong with that request. Please try again.