Permalink
Browse files

API reference docs

  • Loading branch information...
1 parent 2512d84 commit 7cd8c7fa6f78a65c1cbe765dc73021dd7806ba28 Michael Klishin committed Jan 18, 2013
Showing with 99 additions and 3 deletions.
  1. +99 −3 lib/bunny/channel.rb
View
102 lib/bunny/channel.rb
@@ -184,6 +184,7 @@ def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))
# Opens the channel and resets its internal state
# @return [Bunny::Channel] Self
+ # @api public
def open
@threads_waiting_on_continuations = Set.new
@threads_waiting_on_confirms_continuations = Set.new
@@ -200,17 +201,20 @@ def open
# Closes the channel. Closed channels can no longer be used (this includes associated
# {Bunny::Queue}, {Bunny::Exchange} and {Bunny::Consumer} instances.
+ # @api public
def close
@connection.close_channel(self)
closed!
end
# @return [Boolean] true if this channel is open, false otherwise
+ # @api public
def open?
@status == :open
end
# @return [Boolean] true if this channel is closed (manually or because of an exception), false otherwise
+ # @api public
def closed?
@status == :closed
end
@@ -262,6 +266,7 @@ def frame_size
# @return [Bunny::Exchange] Exchange instance
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
+ # @api public
def fanout(name, opts = {})
Exchange.new(self, :fanout, name, opts)
end
@@ -279,6 +284,7 @@ def fanout(name, opts = {})
# @return [Bunny::Exchange] Exchange instance
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
+ # @api public
def direct(name, opts = {})
Exchange.new(self, :direct, name, opts)
end
@@ -296,6 +302,7 @@ def direct(name, opts = {})
# @return [Bunny::Exchange] Exchange instance
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
+ # @api public
def topic(name, opts = {})
Exchange.new(self, :topic, name, opts)
end
@@ -313,12 +320,14 @@ def topic(name, opts = {})
# @return [Bunny::Exchange] Exchange instance
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
+ # @api public
def headers(name, opts = {})
Exchange.new(self, :headers, name, opts)
end
# Provides access to the default exchange
- # @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
+ # @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
+ # @api public
def default_exchange
self.direct(AMQ::Protocol::EMPTY_STRING, :no_declare => true)
end
@@ -357,6 +366,7 @@ def exchange(name, opts = {})
#
# @return [Bunny::Queue] Queue that was declared or looked up in the cache
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {})
q = find_queue(name) || Bunny::Queue.new(self, name, opts)
@@ -374,6 +384,7 @@ def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {})
# @param [Integer] prefetch_count Prefetch (QoS setting) for this channel
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def prefetch(prefetch_count)
self.basic_qos(prefetch_count, false)
end
@@ -382,11 +393,13 @@ def prefetch(prefetch_count)
# channel.
#
# @param [Boolean] active Should messages to consumers on this channel be delivered?
+ # @api public
def flow(active)
channel_flow(active)
end
# Tells RabbitMQ to redeliver unacknowledged messages
+ # @api public
def recover(ignored = true)
# RabbitMQ only supports basic.recover with requeue = true
basic_recover(true)
@@ -406,6 +419,7 @@ def recover(ignored = true)
# @see Bunny::Channel#ack
# @see Bunny::Channel#nack
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def reject(delivery_tag, requeue = false)
basic_reject(delivery_tag, requeue)
end
@@ -416,6 +430,7 @@ def reject(delivery_tag, requeue = false)
# @param [Boolean] multiple (false) Should all unacknowledged messages up to this be acknowledged as well?
# @see Bunny::Channel#nack
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def ack(delivery_tag, multiple = false)
basic_ack(delivery_tag, multiple)
end
@@ -470,6 +485,7 @@ def on_error(&block)
# @option opts [String] :app_id Optional application ID
#
# @return [Bunny::Channel] Self
+ # @api public
def basic_publish(payload, exchange, routing_key, opts = {})
raise_if_no_longer_open!
@@ -524,6 +540,7 @@ def basic_publish(payload, exchange, routing_key, opts = {})
# ch.acknowledge(delivery_info.delivery_tag)
# @see Bunny::Queue#pop
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def basic_get(queue, opts = {:ack => true})
raise_if_no_longer_open!
@@ -553,6 +570,7 @@ def basic_get(queue, opts = {:ack => true})
# @param [AMQ::Protocol::Basic::QosOk] basic.qos-ok response
# @see Bunny::Channel#prefetch
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def basic_qos(prefetch_count, global = false)
raise ArgumentError.new("prefetch count must be a positive integer, given: #{prefetch_count}") if prefetch_count < 0
raise_if_no_longer_open!
@@ -573,6 +591,7 @@ def basic_qos(prefetch_count, global = false)
#
# @param [Boolean] requeue Should messages be requeued?
# @return [AMQ::Protocol::Basic::RecoverOk] RabbitMQ response
+ # @api public
def basic_recover(requeue)
raise_if_no_longer_open!
@@ -622,6 +641,7 @@ def basic_recover(requeue)
#
# @see Bunny::Channel#basic_nack
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def basic_reject(delivery_tag, requeue)
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Basic::Reject.encode(@id, delivery_tag, requeue))
@@ -667,6 +687,7 @@ def basic_reject(delivery_tag, requeue)
# ch.basic_ack(delivery_info.delivery_tag, true)
#
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def basic_ack(delivery_tag, multiple)
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Basic::Ack.encode(@id, delivery_tag, multiple))
@@ -726,6 +747,7 @@ def basic_ack(delivery_tag, multiple)
#
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
+ # @api public
def basic_nack(delivery_tag, requeue, multiple = false)
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Basic::Nack.encode(@id,
@@ -748,6 +770,7 @@ def basic_nack(delivery_tag, requeue, multiple = false)
#
# @return [AMQ::Protocol::Basic::ConsumeOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, exclusive = false, arguments = nil, &block)
raise_if_no_longer_open!
maybe_start_consumer_work_pool!
@@ -788,6 +811,7 @@ def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, e
#
# @return [AMQ::Protocol::Basic::ConsumeOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def basic_consume_with(consumer)
raise_if_no_longer_open!
maybe_start_consumer_work_pool!
@@ -825,6 +849,7 @@ def basic_consume_with(consumer)
#
# @return [AMQ::Protocol::Basic::CancelOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def basic_cancel(consumer_tag)
@connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@id, consumer_tag, false))
@@ -856,6 +881,7 @@ def basic_cancel(consumer_tag)
#
# @return [AMQ::Protocol::Queue::DeclareOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def queue_declare(name, opts = {})
raise_if_no_longer_open!
@@ -884,6 +910,7 @@ def queue_declare(name, opts = {})
#
# @return [AMQ::Protocol::Queue::DeleteOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def queue_delete(name, opts = {})
raise_if_no_longer_open!
@@ -906,6 +933,7 @@ def queue_delete(name, opts = {})
#
# @return [AMQ::Protocol::Queue::PurgeOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def queue_purge(name, opts = {})
raise_if_no_longer_open!
@@ -931,6 +959,7 @@ def queue_purge(name, opts = {})
# @return [AMQ::Protocol::Queue::BindOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @see http://rubybunny.info/articles/bindings.html Bindings guide
+ # @api public
def queue_bind(name, exchange, opts = {})
raise_if_no_longer_open!
@@ -966,6 +995,7 @@ def queue_bind(name, exchange, opts = {})
# @return [AMQ::Protocol::Queue::UnbindOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @see http://rubybunny.info/articles/bindings.html Bindings guide
+ # @api public
def queue_unbind(name, exchange, opts = {})
raise_if_no_longer_open!
@@ -1006,6 +1036,7 @@ def queue_unbind(name, exchange, opts = {})
#
# @return [AMQ::Protocol::Exchange::DeclareOk] RabbitMQ response
# @see http://rubybunny.info/articles/echanges.html Exchanges and Publishing guide
+ # @api public
def exchange_declare(name, type, opts = {})
raise_if_no_longer_open!
@@ -1035,6 +1066,7 @@ def exchange_declare(name, type, opts = {})
#
# @return [AMQ::Protocol::Exchange::DeleteOk] RabbitMQ response
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
+ # @api public
def exchange_delete(name, opts = {})
raise_if_no_longer_open!
@@ -1050,6 +1082,21 @@ def exchange_delete(name, opts = {})
@last_exchange_delete_ok
end
+ # Binds an exchange to another exchange using exchange.bind AMQP 0.9.1 extension
+ # that RabbitMQ provides.
+ #
+ # @param [String] source Source exchange name
+ # @param [String] destination Destination exchange name
+ # @param [Hash] opts Options
+ #
+ # @option opts [String] routing_key (nil) Routing key used for binding
+ # @option opts [Hash] arguments ({}) Optional arguments
+ #
+ # @return [AMQ::Protocol::Exchange::BindOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
+ # @see http://rubybunny.info/articles/bindings.html Bindings guide
+ # @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
+ # @api public
def exchange_bind(source, destination, opts = {})
raise_if_no_longer_open!
@@ -1079,6 +1126,21 @@ def exchange_bind(source, destination, opts = {})
@last_exchange_bind_ok
end
+ # Unbinds an exchange from another exchange using exchange.unbind AMQP 0.9.1 extension
+ # that RabbitMQ provides.
+ #
+ # @param [String] source Source exchange name
+ # @param [String] destination Destination exchange name
+ # @param [Hash] opts Options
+ #
+ # @option opts [String] routing_key (nil) Routing key used for binding
+ # @option opts [Hash] arguments ({}) Optional arguments
+ #
+ # @return [AMQ::Protocol::Exchange::UnbindOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
+ # @see http://rubybunny.info/articles/bindings.html Bindings guide
+ # @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
+ # @api public
def exchange_unbind(source, destination, opts = {})
raise_if_no_longer_open!
@@ -1114,6 +1176,16 @@ def exchange_unbind(source, destination, opts = {})
# @group Flow control (channel.*)
+ # Enables or disables message flow for the channel. When message flow is disabled,
+ # no new messages will be delivered to consumers on this channel. This is typically
+ # used by consumers that cannot keep up with the influx of messages.
+ #
+ # @note Recent (e.g. 2.8.x., 3.x) RabbitMQ will employ TCP/IP-level back pressure on publishers if it detects
+ # that consumers do not keep up with them.
+ #
+ # @return [AMQ::Protocol::Channel::FlowOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @api public
def channel_flow(active)
raise_if_no_longer_open!
@@ -1133,6 +1205,8 @@ def channel_flow(active)
# @group Transactions (tx.*)
# Puts the channel into transaction mode (starts a transaction)
+ # @return [AMQ::Protocol::Tx::SelectOk] RabbitMQ response
+ # @api public
def tx_select
raise_if_no_longer_open!
@@ -1146,6 +1220,8 @@ def tx_select
end
# Commits current transaction
+ # @return [AMQ::Protocol::Tx::CommitOk] RabbitMQ response
+ # @api public
def tx_commit
raise_if_no_longer_open!
@@ -1159,6 +1235,8 @@ def tx_commit
end
# Rolls back current transaction
+ # @return [AMQ::Protocol::Tx::RollbackOk] RabbitMQ response
+ # @api public
def tx_rollback
raise_if_no_longer_open!
@@ -1178,11 +1256,18 @@ def tx_rollback
# @group Publisher Confirms (confirm.*)
# @return [Boolean] true if this channel has Publisher Confirms enabled, false otherwise
+ # @api public
def using_publisher_confirmations?
@next_publish_seq_no > 0
end
- # Enables publisher confirms
+ # Enables publisher confirms for the channel.
+ # @return [AMQ::Protocol::Confirm::SelectOk] RabbitMQ response
+ # @see #wait_for_confirms
+ # @see #unconfirmed_set
+ # @see #nacked_set
+ # @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
+ # @api public
def confirm_select(callback=nil)
raise_if_no_longer_open!
@@ -1204,7 +1289,14 @@ def confirm_select(callback=nil)
end
# Blocks calling thread until confirms are received for all
- # currently unacknowledged published messages
+ # currently unacknowledged published messages.
+ #
+ # @return [Boolean] true if all messages were acknowledged positively, false otherwise
+ # @see #confirm_select
+ # @see #unconfirmed_set
+ # @see #nacked_set
+ # @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
+ # @api public
def wait_for_confirms
wait_on_confirms_continuations
@@ -1238,6 +1330,8 @@ def generate_consumer_tag(name = "bunny")
# Recovery
#
+ # @group Network Failure Recovery
+
# Recovers basic.qos setting, exchanges, queues and consumers. Used by the Automatic Network Failure
# Recovery feature.
#
@@ -1296,6 +1390,8 @@ def recover_consumers
end
end
+ # @endgroup
+
#
# Implementation

0 comments on commit 7cd8c7f

Please sign in to comment.