Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

API reference docs

  • Loading branch information...
commit 5e399246408f5bedcaa091cad0e24f042e0de839 1 parent be768e6
Michael Klishin authored
Showing with 224 additions and 2 deletions.
  1. +224 −2 lib/bunny/channel.rb
View
226 lib/bunny/channel.rb
@@ -527,7 +527,7 @@ def basic_publish(payload, exchange, routing_key, opts = {})
def basic_get(queue, opts = {:ack => true})
raise_if_no_longer_open!
- @connection.send_frame(AMQ::Protocol::Basic::Get.encode(@id, queue, !opts[:ack]))
+ @connection.send_frame(AMQ::Protocol::Basic::Get.encode(@id, queue, !(opts[:ack])))
# this is a workaround for the edge case when basic_get is called in a tight loop
# and network goes down we need to perform recovery. The problem is, basic_get will
# keep blocking the thread that calls it without clear way to constantly unblock it
@@ -569,6 +569,10 @@ def basic_qos(prefetch_count, global = false)
@last_basic_qos_ok
end
+ # Redeliver unacknowledged messages
+ #
+ # @param [Boolean] requeue Should messages be requeued?
+ # @return [AMQ::Protocol::Basic::RecoverOk] RabbitMQ response
def basic_recover(requeue)
raise_if_no_longer_open!
@@ -581,8 +585,43 @@ def basic_recover(requeue)
@last_basic_recover_ok
end
- # Acknowledges a delivery (message).
+ # Rejects or requeues a message.
+ #
+ # @param [Integer] delivery_tag Delivery tag obtained from delivery info
+ # @param [Boolean] requeue Should the message be requeued?
# @return [NilClass] nil
+ #
+ # @example Requeue a message
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # q.subscribe do |delivery_info, properties, payload|
+ # # requeue the message
+ # ch.basic_reject(delivery_info.delivery_tag, true)
+ # end
+ #
+ # @example Reject a message
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # q.subscribe do |delivery_info, properties, payload|
+ # # requeue the message
+ # ch.basic_reject(delivery_info.delivery_tag, false)
+ # end
+ #
+ # @example Requeue a message fetched via basic.get
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # # we assume the queue exists and has messages
+ # delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # ch.basic_reject(delivery_info.delivery_tag, true)
+ #
+ # @see Bunny::Channel#basic_nack
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
def basic_reject(delivery_tag, requeue)
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Basic::Reject.encode(@id, delivery_tag, requeue))
@@ -591,7 +630,43 @@ def basic_reject(delivery_tag, requeue)
end
# Acknowledges a delivery (message).
+ #
+ # @param [Integer] delivery_tag Delivery tag obtained from delivery info
+ # @param [Boolean] multiple Should all deliveries up to this one be acknowledged?
# @return [NilClass] nil
+ #
+ # @example Ack a message
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # q.subscribe do |delivery_info, properties, payload|
+ # # requeue the message
+ # ch.basic_ack(delivery_info.delivery_tag)
+ # end
+ #
+ # @example Ack a message fetched via basic.get
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # # we assume the queue exists and has messages
+ # delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # ch.basic_ack(delivery_info.delivery_tag)
+ #
+ # @example Ack multiple messages fetched via basic.get
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # # we assume the queue exists and has messages
+ # _, _, payload1 = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # _, _, payload2 = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # delivery_info, properties, payload3 = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # # ack all fetched messages up to payload3
+ # ch.basic_ack(delivery_info.delivery_tag, true)
+ #
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
def basic_ack(delivery_tag, multiple)
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Basic::Ack.encode(@id, delivery_tag, multiple))
@@ -599,6 +674,58 @@ def basic_ack(delivery_tag, multiple)
nil
end
+ # Rejects or requeues messages just like {Bunny::Channel#basic_reject} but can do so
+ # with multiple messages at once.
+ #
+ # @param [Integer] delivery_tag Delivery tag obtained from delivery info
+ # @param [Boolean] requeue Should the message be requeued?
+ # @param [Boolean] multiple Should all deliveries up to this one be rejected/requeued?
+ # @return [NilClass] nil
+ #
+ # @example Requeue a message
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # q.subscribe do |delivery_info, properties, payload|
+ # # requeue the message
+ # ch.basic_nack(delivery_info.delivery_tag, true)
+ # end
+ #
+ # @example Reject a message
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # q.subscribe do |delivery_info, properties, payload|
+ # # requeue the message
+ # ch.basic_nack(delivery_info.delivery_tag, false)
+ # end
+ #
+ # @example Requeue a message fetched via basic.get
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # # we assume the queue exists and has messages
+ # delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # ch.basic_nack(delivery_info.delivery_tag, true)
+ #
+ #
+ # @example Requeue multiple messages fetched via basic.get
+ # conn = Bunny.new
+ # conn.start
+ #
+ # ch = conn.create_channel
+ # # we assume the queue exists and has messages
+ # _, _, payload1 = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # _, _, payload2 = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # delivery_info, properties, payload3 = ch.basic_get("bunny.examples.queue3", :ack => true)
+ # # requeue all fetched messages up to payload3
+ # ch.basic_nack(delivery_info.delivery_tag, true, true)
+ #
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
def basic_nack(delivery_tag, requeue, multiple = false)
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Basic::Nack.encode(@id,
@@ -609,6 +736,18 @@ def basic_nack(delivery_tag, requeue, multiple = false)
nil
end
+ # Registers a consumer for queue. Delivered messages will be handled with the block
+ # provided to this method.
+ #
+ # @param [String, Bunny::Queue] queue Queue to consume from
+ # @param [String] consumer_tag Consumer tag (unique identifier), generated by Bunny by default
+ # @param [Boolean] no_ack (false) If false, delivered messages will be automatically acknowledged.
+ # If true, manual acknowledgements will be necessary.
+ # @param [Boolean] exclusive (false) Should this consumer be exclusive?
+ # @param [Hash] arguments (nil) Optional arguments that may be used by RabbitMQ extensions, etc
+ #
+ # @return [AMQ::Protocol::Basic::ConsumeOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, exclusive = false, arguments = nil, &block)
raise_if_no_longer_open!
maybe_start_consumer_work_pool!
@@ -642,6 +781,13 @@ def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, e
@last_basic_consume_ok
end
+ # Registers a consumer for queue as {Bunny::Consumer} instance.
+ #
+ # @param [Bunny::Consumer] consumer Consumer to register. It should already have queue name, consumer tag
+ # and other attributes set.
+ #
+ # @return [AMQ::Protocol::Basic::ConsumeOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
def basic_consume_with(consumer)
raise_if_no_longer_open!
maybe_start_consumer_work_pool!
@@ -672,6 +818,13 @@ def basic_consume_with(consumer)
@last_basic_consume_ok
end
+ # Removes a consumer. Messages for this consumer will no longer be delivered. If the queue
+ # it was on is auto-deleted and this consumer was the last one, the queue will be deleted.
+ #
+ # @param [String] consumer_tag Consumer tag (unique identifier) to cancel
+ #
+ # @return [AMQ::Protocol::Basic::CancelOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
def basic_cancel(consumer_tag)
@connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@id, consumer_tag, false))
@@ -687,6 +840,22 @@ def basic_cancel(consumer_tag)
# @group Queue operations (queue.*)
+ # Declares a queue using queue.declare AMQP 0.9.1 method.
+ #
+ # @param [String] name Queue name
+ # @param [Hash] opts Queue properties
+ #
+ # @option opts [Boolean] durable (false) Should information about this queue be persisted to disk so that it
+ # can survive broker restarts? Typically set to true for long-lived queues.
+ # @option opts [Boolean] auto_delete (false) Should this queue be deleted when the last consumer is cancelled?
+ # @option opts [Boolean] exclusive (false) Should only this connection be able to use this queue?
+ # If true, the queue will be automatically deleted when this
+ # connection is closed
+ # @option opts [Boolean] passive (false) If true, queue will be checked for existence. If it does not
+ # exist, {Bunny::NotFound} will be raised.
+ #
+ # @return [AMQ::Protocol::Queue::DeclareOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
def queue_declare(name, opts = {})
raise_if_no_longer_open!
@@ -705,6 +874,16 @@ def queue_declare(name, opts = {})
@last_queue_declare_ok
end
+ # Deletes a queue using queue.delete AMQP 0.9.1 method
+ #
+ # @param [String] name Queue name
+ # @param [Hash] opts Options
+ #
+ # @option opts [Boolean] if_unused (false) Should this queue be deleted only if it has no consumers?
+ # @option opts [Boolean] if_empty (false) Should this queue be deleted only if it has no messages?
+ #
+ # @return [AMQ::Protocol::Queue::DeleteOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
def queue_delete(name, opts = {})
raise_if_no_longer_open!
@@ -721,6 +900,12 @@ def queue_delete(name, opts = {})
@last_queue_delete_ok
end
+ # Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.
+ #
+ # @param [String] name Queue name
+ #
+ # @return [AMQ::Protocol::Queue::PurgeOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
def queue_purge(name, opts = {})
raise_if_no_longer_open!
@@ -734,6 +919,18 @@ def queue_purge(name, opts = {})
@last_queue_purge_ok
end
+ # Binds a queue to an exchange using queue.bind AMQP 0.9.1 method
+ #
+ # @param [String] name Queue name
+ # @param [String] exchange Exchange name
+ # @param [Hash] opts Options
+ #
+ # @option opts [String] routing_key (nil) Routing key used for binding
+ # @option opts [Hash] arguments ({}) Optional arguments
+ #
+ # @return [AMQ::Protocol::Queue::BindOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @see http://rubybunny.info/articles/bindings.html Bindings guide
def queue_bind(name, exchange, opts = {})
raise_if_no_longer_open!
@@ -757,6 +954,18 @@ def queue_bind(name, exchange, opts = {})
@last_queue_bind_ok
end
+ # Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method
+ #
+ # @param [String] name Queue name
+ # @param [String] exchange Exchange name
+ # @param [Hash] opts Options
+ #
+ # @option opts [String] routing_key (nil) Routing key used for binding
+ # @option opts [Hash] arguments ({}) Optional arguments
+ #
+ # @return [AMQ::Protocol::Queue::UnbindOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
+ # @see http://rubybunny.info/articles/bindings.html Bindings guide
def queue_unbind(name, exchange, opts = {})
raise_if_no_longer_open!
@@ -784,6 +993,19 @@ def queue_unbind(name, exchange, opts = {})
# @group Exchange operations (exchange.*)
+ # Declares a echange using echange.declare AMQP 0.9.1 method.
+ #
+ # @param [String] name Exchange name
+ # @param [Hash] opts Exchange properties
+ #
+ # @option opts [Boolean] durable (false) Should information about this echange be persisted to disk so that it
+ # can survive broker restarts? Typically set to true for long-lived exchanges.
+ # @option opts [Boolean] auto_delete (false) Should this echange be deleted when it is no longer used?
+ # @option opts [Boolean] passive (false) If true, exchange will be checked for existence. If it does not
+ # exist, {Bunny::NotFound} will be raised.
+ #
+ # @return [AMQ::Protocol::Exchange::DeclareOk] RabbitMQ response
+ # @see http://rubybunny.info/articles/echanges.html Exchanges and Publishing guide
def exchange_declare(name, type, opts = {})
raise_if_no_longer_open!
Please sign in to comment.
Something went wrong with that request. Please try again.