Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge commit 'chuckremes/master' into reconnect

* commit 'chuckremes/master':
  Clarified and corrected new #unbind/#unsubscribe/#subscribed? rdoc
  Small patch to add #subscribed? metho to MQ::Queue API
  Removed a debug statement used while creating rdoc
  Reworked all code examples so they use the MQ API instead of the
  Added rdoc to the main classes and modules used for building clients.
  • Loading branch information...
commit 70f360440374a9f503d31623b455125efd4c2bd1 2 parents aed2e35 + 15a9fac
@tmm1 authored
View
7 lib/amqp.rb
@@ -61,7 +61,7 @@ def self.settings
# between the client and the server. Extremely useful for debugging.
#
# AMQP.start do
- # # default to connecting to localhost:5672
+ # # default is to connect to localhost:5672
#
# # define queues, exchanges and bindings here.
# # also define all subscriptions and/or publishers
@@ -71,6 +71,11 @@ def self.settings
# # is called.
# end
#
+ # Most code will use the MQ api. Any calls to MQ.direct / MQ.fanout /
+ # MQ.topic / MQ.queue will implicitly call #start. In those cases,
+ # it is sufficient to put your code inside of an EventMachine.run
+ # block. See the code examples in MQ for details.
+ #
def self.start *args, &blk
EM.run{
@conn ||= connect *args
View
1  lib/amqp/protocol.rb
@@ -66,6 +66,7 @@ def to_frame channel = 0
end
#:startdoc:
+ #
# Contains a properties hash that holds some potentially interesting
# information.
# * :delivery_mode
View
490 lib/mq.rb
@@ -20,23 +20,119 @@ class Error < StandardError; end
# The top-level class for building AMQP clients. This class contains several
# convenience methods for working with queues and exchanges. Many calls
-# delegate/forwards to the appropriate subclass method.
+# delegate/forward to subclasses, but this is the preferred API. The subclass
+# API is subject to change while this high-level API will likely remain
+# unchanged as the library evolves. All code examples will be written using
+# the MQ API.
+#
+# Below is a somewhat complex example that demonstrates several capabilities
+# of the library. The example starts a clock using a +fanout+ exchange which
+# is used for 1 to many communications. Each consumer generates a queue to
+# receive messages and do some operation (in this case, print the time).
+# One consumer prints messages every second while the second consumer prints
+# messages every 2 seconds. After 5 seconds has elapsed, the 1 second
+# consumer is deleted.
+#
+# Of interest is the relationship of EventMachine to the process. All MQ
+# operations must occur within the context of an EM.run block. We start
+# EventMachine in its own thread with an empty block; all subsequent calls
+# to the MQ API add their blocks to the EM.run block. This demonstrates how
+# the library could be used to build up and tear down communications outside
+# the context of an EventMachine block and/or integrate the library with
+# other synchronous operations. See the EventMachine documentation for
+# more information.
+#
+# require 'rubygems'
+# require 'mq'
+#
+# thr = Thread.new { EM.run }
+#
+# # turns on extreme logging
+# #AMQP.logging = true
+#
+# def log *args
+# p args
+# end
+#
+# def publisher
+# clock = MQ.fanout('clock')
+# EM.add_periodic_timer(1) do
+# puts
+#
+# log :publishing, time = Time.now
+# clock.publish(Marshal.dump(time))
+# end
+# end
+#
+# def one_second_consumer
+# MQ.queue('every second').bind(MQ.fanout('clock')).subscribe do |time|
+# log 'every second', :received, Marshal.load(time)
+# end
+# end
+#
+# def two_second_consumer
+# MQ.queue('every 2 seconds').bind('clock').subscribe do |time|
+# time = Marshal.load(time)
+# log 'every 2 seconds', :received, time if time.sec % 2 == 0
+# end
+# end
+#
+# def delete_one_second
+# EM.add_timer(5) do
+# # delete the 'every second' queue
+# log 'Deleting [every second] queue'
+# MQ.queue('every second').delete
+# end
+# end
+#
+# publisher
+# one_second_consumer
+# two_second_consumer
+# delete_one_second
+# thr.join
+#
+# __END__
+#
+# [:publishing, Tue Jan 06 22:46:14 -0600 2009]
+# ["every second", :received, Tue Jan 06 22:46:14 -0600 2009]
+# ["every 2 seconds", :received, Tue Jan 06 22:46:14 -0600 2009]
+#
+# [:publishing, Tue Jan 06 22:46:16 -0600 2009]
+# ["every second", :received, Tue Jan 06 22:46:16 -0600 2009]
+# ["every 2 seconds", :received, Tue Jan 06 22:46:16 -0600 2009]
+#
+# [:publishing, Tue Jan 06 22:46:17 -0600 2009]
+# ["every second", :received, Tue Jan 06 22:46:17 -0600 2009]
+#
+# [:publishing, Tue Jan 06 22:46:18 -0600 2009]
+# ["every second", :received, Tue Jan 06 22:46:18 -0600 2009]
+# ["every 2 seconds", :received, Tue Jan 06 22:46:18 -0600 2009]
+# ["Deleting [every second] queue"]
+#
+# [:publishing, Tue Jan 06 22:46:19 -0600 2009]
+#
+# [:publishing, Tue Jan 06 22:46:20 -0600 2009]
+# ["every 2 seconds", :received, Tue Jan 06 22:46:20 -0600 2009]
+#
class MQ
include AMQP
include EM::Deferrable
# Returns a new channel. A channel is a bidirectional virtual
# connection between the client and the AMQP server. Elsewhere in the
- # library the channel is referred to in parameter lists as 'mq'.
+ # library the channel is referred to in parameter lists as +mq+.
#
- # Optionally takes the result from calling EventMachine::connect.
+ # Optionally takes the result from calling AMQP::connect.
+ #
+ # Rarely called directly by client code. This is implicitly called
+ # by most instance methods. See #method_missing.
#
# EM.run do
# channel = MQ.new
# end
#
# EM.run do
- # channel = MQ.new connect
+ # channel = MQ.new AMQP::connect
# end
#
def initialize connection = nil
@@ -147,62 +243,386 @@ def send *args
}
end
- # A convenience method for defining a direct exchange. See
- # MQ::Exchange.new for details and available options.
+ # Defines, intializes and returns an Exchange to act as an ingress
+ # point for all published messages.
+ #
+ # == Direct
+ # A direct exchange is useful for 1:1 communication between a publisher and
+ # subscriber. Messages are routed to the queue with a binding that shares
+ # the same name as the exchange. Alternately, the messages are routed to
+ # the bound queue that shares the same name as the routing key used for
+ # defining the exchange. This exchange type does not honor the +:key+ option
+ # when defining a new instance with a name. It _will_ honor the +:key+ option
+ # if the exchange name is the empty string.
+ # Allocating this exchange without a name _or_ with the empty string
+ # will use the internal 'amq.direct' exchange.
+ #
+ # Any published message, regardless of its persistence setting, is thrown
+ # away by the exchange when there are no queues bound to it.
+ #
+ # # exchange is named 'foo'
+ # exchange = MQ.direct('foo')
+ #
+ # # or, the exchange can use the default name (amq.direct) and perform
+ # # routing comparisons using the :key
+ # exchange = MQ.direct("", :key => 'foo')
+ # exchange.publish('some data') # will be delivered to queue bound to 'foo'
+ #
+ # queue = MQ.queue('foo')
+ # # can receive data since the queue name and the exchange key match exactly
+ # queue.pop { |data| puts "received data [#{data}]" }
#
- # direct_exch = MQ.direct('foo')
- # # equivalent to
- # direct_exch = MQ::Exchange.new(MQ.new, :direct, 'foo')
+ # == Options
+ # * :passive => true | false (default false)
+ # If set, the server will not create the exchange if it does not
+ # already exist. The client can use this to check whether an exchange
+ # exists without modifying the server state.
+ #
+ # * :durable => true | false (default false)
+ # If set when creating a new exchange, the exchange will be marked as
+ # durable. Durable exchanges remain active when a server restarts.
+ # Non-durable exchanges (transient exchanges) are purged if/when a
+ # server restarts.
+ #
+ # A transient exchange (the default) is stored in memory-only. The
+ # exchange and all bindings will be lost on a server restart.
+ # It makes no sense to publish a persistent message to a transient
+ # exchange.
+ #
+ # Durable exchanges and their bindings are recreated upon a server
+ # restart. Any published messages not routed to a bound queue are lost.
+ #
+ # * :auto_delete => true | false (default false)
+ # If set, the exchange is deleted when all queues have finished
+ # using it. The server waits for a short period of time before
+ # determining the exchange is unused to give time to the client code
+ # to bind a queue to it.
+ #
+ # If the exchange has been previously declared, this option is ignored
+ # on subsequent declarations.
+ #
+ # * :internal => true | false (default false)
+ # If set, the exchange may not be used directly by publishers, but
+ # only when bound to other exchanges. Internal exchanges are used to
+ # construct wiring that is not visible to applications.
+ #
+ # * :nowait => true | false (default true)
+ # If set, the server will not respond to the method. The client should
+ # not wait for a reply method. If the server could not complete the
+ # method it will raise a channel or connection exception.
+ #
+ # == Exceptions
+ # Doing any of these activities are illegal and will raise MQ:Error.
+ # * redeclare an already-declared exchange to a different type
+ # * :passive => true and the exchange does not exist (NOT_FOUND)
#
def direct name = 'amq.direct', opts = {}
exchanges[name] ||= Exchange.new(self, :direct, name, opts)
end
- # A convenience method for defining a fanout exchange. See
- # MQ::Exchange.new for details and available options.
+ # Defines, intializes and returns an Exchange to act as an ingress
+ # point for all published messages.
+ #
+ # == Fanout
+ # A fanout exchange is useful for 1:N communication where one publisher
+ # feeds multiple subscribers. Like direct exchanges, messages published
+ # to a fanout exchange are delivered to queues whose name matches the
+ # exchange name (or are bound to that exchange name). Each queue gets
+ # its own copy of the message.
+ #
+ # Any published message, regardless of its persistence setting, is thrown
+ # away by the exchange when there are no queues bound to it.
#
- # fanout_exch = MQ.fanout('foo')
- # # equivalent to
- # fanout_exch = MQ::Exchange.new(MQ.new, :fanout, 'foo')
+ # Like the direct exchange type, this exchange type does not honor the
+ # +:key+ option when defining a new instance with a name. It _will_ honor
+ # the +:key+ option if the exchange name is the empty string.
+ # Allocating this exchange without a name _or_ with the empty string
+ # will use the internal 'amq.fanout' exchange.
+ #
+ # EM.run do
+ # clock = MQ.fanout('clock')
+ # EM.add_periodic_timer(1) do
+ # puts "\npublishing #{time = Time.now}"
+ # clock.publish(Marshal.dump(time))
+ # end
+ #
+ # amq = MQ.queue('every second')
+ # amq.bind(MQ.fanout('clock')).subscribe do |time|
+ # puts "every second received #{Marshal.load(time)}"
+ # end
+ #
+ # # note the string passed to #bind
+ # MQ.queue('every 5 seconds').bind('clock').subscribe do |time|
+ # time = Marshal.load(time)
+ # puts "every 5 seconds received #{time}" if time.strftime('%S').to_i%5 == 0
+ # end
+ # end
+ #
+ # == Options
+ # * :passive => true | false (default false)
+ # If set, the server will not create the exchange if it does not
+ # already exist. The client can use this to check whether an exchange
+ # exists without modifying the server state.
+ #
+ # * :durable => true | false (default false)
+ # If set when creating a new exchange, the exchange will be marked as
+ # durable. Durable exchanges remain active when a server restarts.
+ # Non-durable exchanges (transient exchanges) are purged if/when a
+ # server restarts.
+ #
+ # A transient exchange (the default) is stored in memory-only. The
+ # exchange and all bindings will be lost on a server restart.
+ # It makes no sense to publish a persistent message to a transient
+ # exchange.
+ #
+ # Durable exchanges and their bindings are recreated upon a server
+ # restart. Any published messages not routed to a bound queue are lost.
+ #
+ # * :auto_delete => true | false (default false)
+ # If set, the exchange is deleted when all queues have finished
+ # using it. The server waits for a short period of time before
+ # determining the exchange is unused to give time to the client code
+ # to bind a queue to it.
+ #
+ # If the exchange has been previously declared, this option is ignored
+ # on subsequent declarations.
+ #
+ # * :internal => true | false (default false)
+ # If set, the exchange may not be used directly by publishers, but
+ # only when bound to other exchanges. Internal exchanges are used to
+ # construct wiring that is not visible to applications.
+ #
+ # * :nowait => true | false (default true)
+ # If set, the server will not respond to the method. The client should
+ # not wait for a reply method. If the server could not complete the
+ # method it will raise a channel or connection exception.
+ #
+ # == Exceptions
+ # Doing any of these activities are illegal and will raise MQ:Error.
+ # * redeclare an already-declared exchange to a different type
+ # * :passive => true and the exchange does not exist (NOT_FOUND)
#
def fanout name = 'amq.fanout', opts = {}
exchanges[name] ||= Exchange.new(self, :fanout, name, opts)
end
- # A convenience method for defining a topic exchange. See
- # MQ::Exchange.new for details and available options.
+ # Defines, intializes and returns an Exchange to act as an ingress
+ # point for all published messages.
+ #
+ # == Topic
+ # A topic exchange allows for messages to be published to an exchange
+ # tagged with a specific routing key. The Exchange uses the routing key
+ # to determine which queues to deliver the message. Wildcard matching
+ # is allowed. The topic must be declared using dot notation to separate
+ # each subtopic.
+ #
+ # This is the only exchange type to honor the +key+ hash key for all
+ # cases.
+ #
+ # Any published message, regardless of its persistence setting, is thrown
+ # away by the exchange when there are no queues bound to it.
+ #
+ # As part of the AMQP standard, each server _should_ predeclare a topic
+ # exchange called 'amq.topic' (this is not required by the standard).
+ # Allocating this exchange without a name _or_ with the empty string
+ # will use the internal 'amq.topic' exchange.
+ #
+ # The classic example is delivering market data. When publishing market
+ # data for stocks, we may subdivide the stream based on 2
+ # characteristics: nation code and trading symbol. The topic tree for
+ # Apple Computer would look like:
+ # 'stock.us.aapl'
+ # For a foreign stock, it may look like:
+ # 'stock.de.dax'
#
- # topic_exch = MQ.topic('foo', :key => 'stocks.us')
- # # equivalent to
- # topic_exch = MQ::Exchange.new(MQ.new, :topic, 'foo', :key => 'stocks.us')
+ # When publishing data to the exchange, bound queues subscribing to the
+ # exchange indicate which data interests them by passing a routing key
+ # for matching against the published routing key.
+ #
+ # EM.run do
+ # exch = MQ.topic("stocks")
+ # keys = ['stock.us.aapl', 'stock.de.dax']
+ #
+ # EM.add_periodic_timer(1) do # every second
+ # puts
+ # exch.publish(10+rand(10), :routing_key => keys[rand(2)])
+ # end
+ #
+ # # match against one dot-separated item
+ # MQ.queue('us stocks').bind(exch, :key => 'stock.us.*').subscribe do |price|
+ # puts "us stock price [#{price}]"
+ # end
+ #
+ # # match against multiple dot-separated items
+ # MQ.queue('all stocks').bind(exch, :key => 'stock.#').subscribe do |price|
+ # puts "all stocks: price [#{price}]"
+ # end
+ #
+ # # require exact match
+ # MQ.queue('only dax').bind(exch, :key => 'stock.de.dax').subscribe do |price|
+ # puts "dax price [#{price}]"
+ # end
+ # end
+ #
+ # For matching, the '*' (asterisk) wildcard matches against one
+ # dot-separated item only. The '#' wildcard (hash or pound symbol)
+ # matches against 0 or more dot-separated items. If none of these
+ # symbols are used, the exchange performs a comparison looking for an
+ # exact match.
+ #
+ # == Options
+ # * :passive => true | false (default false)
+ # If set, the server will not create the exchange if it does not
+ # already exist. The client can use this to check whether an exchange
+ # exists without modifying the server state.
+ #
+ # * :durable => true | false (default false)
+ # If set when creating a new exchange, the exchange will be marked as
+ # durable. Durable exchanges remain active when a server restarts.
+ # Non-durable exchanges (transient exchanges) are purged if/when a
+ # server restarts.
+ #
+ # A transient exchange (the default) is stored in memory-only. The
+ # exchange and all bindings will be lost on a server restart.
+ # It makes no sense to publish a persistent message to a transient
+ # exchange.
+ #
+ # Durable exchanges and their bindings are recreated upon a server
+ # restart. Any published messages not routed to a bound queue are lost.
+ #
+ # * :auto_delete => true | false (default false)
+ # If set, the exchange is deleted when all queues have finished
+ # using it. The server waits for a short period of time before
+ # determining the exchange is unused to give time to the client code
+ # to bind a queue to it.
+ #
+ # If the exchange has been previously declared, this option is ignored
+ # on subsequent declarations.
+ #
+ # * :internal => true | false (default false)
+ # If set, the exchange may not be used directly by publishers, but
+ # only when bound to other exchanges. Internal exchanges are used to
+ # construct wiring that is not visible to applications.
+ #
+ # * :nowait => true | false (default true)
+ # If set, the server will not respond to the method. The client should
+ # not wait for a reply method. If the server could not complete the
+ # method it will raise a channel or connection exception.
+ #
+ # == Exceptions
+ # Doing any of these activities are illegal and will raise MQ:Error.
+ # * redeclare an already-declared exchange to a different type
+ # * :passive => true and the exchange does not exist (NOT_FOUND)
#
def topic name = 'amq.topic', opts = {}
exchanges[name] ||= Exchange.new(self, :topic, name, opts)
end
- # Convenience method for creating or retrieving a queue reference. Wraps
- # calls to MQ::Queue. See the MQ::Queue class definition for the
- # allowable options.
+ # Queues store and forward messages. Queues can be configured in the server
+ # or created at runtime. Queues must be attached to at least one exchange
+ # in order to receive messages from publishers.
+ #
+ # Like an Exchange, queue names starting with 'amq.' are reserved for
+ # internal use. Attempts to create queue names in violation of this
+ # reservation will raise MQ:Error (ACCESS_REFUSED).
+ #
+ # It is not supported to create a queue without a name; some string
+ # (even the empty string) must be passed in the +name+ parameter.
#
- # queue = MQ.queue('bar', :durable => true)
+ # == Options
+ # * :passive => true | false (default false)
+ # If set, the server will not create the exchange if it does not
+ # already exist. The client can use this to check whether an exchange
+ # exists without modifying the server state.
+ #
+ # * :durable => true | false (default false)
+ # If set when creating a new queue, the queue will be marked as
+ # durable. Durable queues remain active when a server restarts.
+ # Non-durable queues (transient queues) are purged if/when a
+ # server restarts. Note that durable queues do not necessarily
+ # hold persistent messages, although it does not make sense to
+ # send persistent messages to a transient queue (though it is
+ # allowed).
#
- # Equivalent to writing:
- # channel = MQ.new
- # queue = MQ::Queue.new(channel, 'bar', :durable => true)
+ # Again, note the durability property on a queue has no influence on
+ # the persistence of published messages. A durable queue containing
+ # transient messages will flush those messages on a restart.
+ #
+ # If the queue has already been declared, any redeclaration will
+ # ignore this setting. A queue may only be declared durable the
+ # first time when it is created.
+ #
+ # * :exclusive => true | false (default false)
+ # Exclusive queues may only be consumed from by the current connection.
+ # Setting the 'exclusive' flag always implies 'auto-delete'. Only a
+ # single consumer is allowed to remove messages from this queue.
+ #
+ # The default is a shared queue. Multiple clients may consume messages
+ # from this queue.
+ #
+ # Attempting to redeclare an already-declared queue as :exclusive => true
+ # will raise MQ:Error.
+ #
+ # * :auto_delete = true | false (default false)
+ # If set, the queue is deleted when all consumers have finished
+ # using it. Last consumer can be cancelled either explicitly or because
+ # its channel is closed. If there was no consumer ever on the queue, it
+ # won't be deleted.
+ #
+ # The server waits for a short period of time before
+ # determining the queue is unused to give time to the client code
+ # to bind an exchange to it.
+ #
+ # If the queue has been previously declared, this option is ignored
+ # on subsequent declarations.
+ #
+ # Any remaining messages in the queue will be purged when the queue
+ # is deleted regardless of the message's persistence setting.
+ #
+ # * :nowait => true | false (default true)
+ # If set, the server will not respond to the method. The client should
+ # not wait for a reply method. If the server could not complete the
+ # method it will raise a channel or connection exception.
#
def queue name, opts = {}
queues[name] ||= Queue.new(self, name, opts)
end
- # Convenience method for creating or retrieving an RPC (remote procedure
- # call) reference. Wraps calls to MQ::RPC. See the MQ::RPC class definition
- # for the allowable options.
+ # Takes a channel, queue and optional object.
#
- # remote_proc = MQ.rpc('bar', Hash.new)
+ # The optional object may be a class name, module name or object
+ # instance. When given a class or module name, the object is instantiated
+ # during this setup. The passed queue is automatically subscribed to so
+ # it passes all messages (and their arguments) to the object.
#
- # Equivalent to writing:
- # channel = MQ.new
- # remote_proc = MQ::RPC.new(channel, 'bar', Hash.new)
+ # Marshalling and unmarshalling the objects is handled internally. This
+ # marshalling is subject to the same restrictions as defined in the
+ # Marshal[http://ruby-doc.org/core/classes/Marshal.html] standard
+ # library. See that documentation for further reference.
+ #
+ # When the optional object is not passed, the returned rpc reference is
+ # used to send messages and arguments to the queue. See #method_missing
+ # which does all of the heavy lifting with the proxy. Some client
+ # elsewhere must call this method *with* the optional block so that
+ # there is a valid destination. Failure to do so will just enqueue
+ # marshalled messages that are never consumed.
+ #
+ # EM.run do
+ # server = MQ.rpc('hash table node', Hash)
+ #
+ # client = MQ.rpc('hash table node')
+ # client[:now] = Time.now
+ # client[:one] = 1
+ #
+ # client.values do |res|
+ # p 'client', :values => res
+ # end
+ #
+ # client.keys do |res|
+ # p 'client', :keys => res
+ # EM.stop_event_loop
+ # end
+ # end
#
def rpc name, obj = nil
rpcs[name] ||= RPC.new(self, name, obj)
@@ -259,6 +679,8 @@ def rpcs
end
# Queue objects keyed on their consumer tags.
+ #
+ # Not typically called by client code.
def consumers
@consumers ||= {}
end
@@ -299,6 +721,8 @@ def MQ.default
Thread.current[:mq] ||= MQ.new
end
+ # Allows for calls to all MQ instance methods. This implicitly calls
+ # MQ.new so that a new channel is allocated for subsequent operations.
def MQ.method_missing meth, *args, &blk
MQ.default.__send__(meth, *args, &blk)
end
View
26 lib/mq/exchange.rb
@@ -7,6 +7,18 @@ class MQ
# It determines the next delivery hop by examining the bindings associated
# with the exchange.
#
+ # There are three (3) supported Exchange types: direct, fanout and topic.
+ #
+ # As part of the standard, the server _must_ predeclare the direct exchange
+ # 'amq.direct' and the fanout exchange 'amq.fanout' (all exchange names
+ # starting with 'amq.' are reserved). Attempts to declare an exchange using
+ # 'amq.' as the name will raise an MQ:Error and fail. In practice these
+ # default exchanges are never used directly by client code.
+ #
+ # These predececlared exchanges are used when the client code declares
+ # an exchange without a name. In these cases the library will use
+ # the default exchange for publishing the messages.
+ #
class Exchange
include AMQP
@@ -181,7 +193,7 @@ def initialize mq, type, name, opts = {}
@type, @name, @opts = type, name, opts
@mq.exchanges[@name = name] ||= self
@key = opts[:key]
-
+
@mq.callback{
@mq.send Protocol::Exchange::Declare.new({ :exchange => name,
:type => type,
@@ -195,8 +207,7 @@ def initialize mq, type, name, opts = {}
# configuration and distributed to any active consumers when the
# transaction, if any, is committed.
#
- # channel = MQ.new
- # exchange = MQ::Exchange.new(channel, :direct, 'direct', :key => 'foo.bar')
+ # exchange = MQ.direct('name', :key => 'foo.bar')
# exchange.publish("some data")
#
# The method takes several hash key options which modify the behavior or
@@ -224,7 +235,12 @@ def initialize mq, type, name, opts = {}
#
# * :persistent
# True or False. When true, this message will remain in the queue until
- # it is consumed. When false, the message will be deleted.
+ # it is consumed (if the queue is durable). When false, the message is
+ # lost if the server restarts and the queue is recreated.
+ #
+ # For high-performance and low-latency, set :persistent => false so the
+ # message stays in memory and is never persisted to non-volatile (slow)
+ # storage.
#
def publish data, opts = {}
@mq.callback{
@@ -253,7 +269,7 @@ def publish data, opts = {}
# Further attempts to publish messages to a deleted exchange will raise
# an MQ::Error due to a channel close exception.
#
- # exchange = MQ::Exchange.new(channel, :direct, 'direct', :key => 'foo.bar')
+ # exchange = MQ.direct('name', :key => 'foo.bar')
# exchange.delete
#
# == Options
View
95 lib/mq/queue.rb
@@ -80,11 +80,18 @@ def initialize mq, name, opts = {}
#
# A valid exchange name (or reference) must be passed as the first
# parameter. Both of these are valid:
- # exch = MQ::Exchange.new(MQ.new, :direct, 'foo exchange')
- # queue = MQ::Queue.new(MQ.new, 'bar queue')
+ # exch = MQ.direct('foo exchange')
+ # queue = MQ.queue('bar queue')
# queue.bind('foo.exchange') # OR
# queue.bind(exch)
#
+ # It is not valid to call #bind without the +exchange+ parameter.
+ #
+ # It is unnecessary to call #bind when the exchange name and queue
+ # name match exactly (for +direct+ and +fanout+ exchanges only).
+ # There is an implicit bind which will deliver the messages from
+ # the exchange to the queue.
+ #
# == Options
# * :key => 'some string'
# Specifies the routing key for the binding. The routing key is
@@ -112,6 +119,20 @@ def bind exchange, opts = {}
self
end
+ # Remove the binding between the queue and exchange. The queue will
+ # not receive any more messages until it is bound to another
+ # exchange.
+ #
+ # Due to the asynchronous nature of the protocol, it is possible for
+ # "in flight" messages to be received after this call completes.
+ # Those messages will be serviced by the last block used in a
+ # #subscribe or #pop call.
+ #
+ # * :nowait => true | false (default true)
+ # If set, the server will not respond to the method. The client should
+ # not wait for a reply method. If the server could not complete the
+ # method it will raise a channel or connection exception.
+ #
def unbind exchange, opts = {}
exchange = exchange.respond_to?(:name) ? exchange.name : exchange
@bindings.delete exchange
@@ -162,31 +183,33 @@ def delete opts = {}
# The provided block is passed a single message each time pop is called.
#
# EM.run do
- # exchange = MQ::Exchange.new(MQ.new, :direct, "foo queue")#, :key => 'foo queue')
+ # exchange = MQ.direct("foo queue")
# EM.add_periodic_timer(1) do
# exchange.publish("random number #{rand(1000)}")
# end
#
- # queue = MQ::Queue.new(MQ.new, 'foo queue')
+ # # note that #bind is never called; it is implicit because
+ # # the exchange and queue names match
+ # queue = MQ.queue('foo queue')
# queue.pop { |body| puts "received payload [#{body}]" }
#
# EM.add_periodic_timer(1) { queue.pop }
# end
#
- # If the block takes 2 parameters, both the headers and the body will
- # be passed in for processing. The headers object is defined by
+ # If the block takes 2 parameters, both the +header+ and the +body+ will
+ # be passed in for processing. The header object is defined by
# AMQP::Protocol::Header.
#
# EM.run do
- # exchange = MQ::Exchange.new(MQ.new, :direct, "foo queue")#, :key => 'foo queue')
+ # exchange = MQ.direct("foo queue")
# EM.add_periodic_timer(1) do
# exchange.publish("random number #{rand(1000)}")
# end
#
- # queue = MQ::Queue.new(MQ.new, 'foo queue')
+ # queue = MQ.queue('foo queue')
# queue.pop do |header, body|
# p header
- # puts "received payload [#{body}]" }
+ # puts "received payload [#{body}]"
# end
#
# EM.add_periodic_timer(1) { queue.pop }
@@ -233,29 +256,31 @@ def pop opts = {}, &blk
# exchange matches a message to this queue.
#
# EM.run do
- # exchange = MQ::Exchange.new(MQ.new, :direct, "foo queue")#, :key => 'foo queue')
+ # exchange = MQ.direct("foo queue")
# EM.add_periodic_timer(1) do
# exchange.publish("random number #{rand(1000)}")
# end
#
- # queue = MQ::Queue.new(MQ.new, 'foo queue')
+ # queue = MQ.queue('foo queue')
# queue.subscribe { |body| puts "received payload [#{body}]" }
# end
#
- # If the block takes 2 parameters, both the headers and the body will
- # be passed in for processing. The headers object is defined by
+ # If the block takes 2 parameters, both the +header+ and the +body+ will
+ # be passed in for processing. The header object is defined by
# AMQP::Protocol::Header.
#
# EM.run do
- # exchange = MQ::Exchange.new(MQ.new, :direct, "foo queue")#, :key => 'foo queue')
+ # exchange = MQ.direct("foo queue")
# EM.add_periodic_timer(1) do
# exchange.publish("random number #{rand(1000)}")
# end
#
- # queue = MQ::Queue.new(MQ.new, 'foo queue')
+ # # note that #bind is never called; it is implicit because
+ # # the exchange and queue names match
+ # queue = MQ.queue('foo queue')
# queue.subscribe do |header, body|
# p header
- # puts "received payload [#{body}]" }
+ # puts "received payload [#{body}]"
# end
# end
#
@@ -277,7 +302,7 @@ def subscribe opts = {}, &blk
@consumer_tag = "#{name}-#{Kernel.rand(999_999_999_999)}"
@mq.consumers[@consumer_tag] = self
- raise Error, 'already subscribed to the queue' if @on_msg
+ raise Error, 'already subscribed to the queue' if subscribed?
@on_msg = blk
@on_msg_opts = opts
@@ -292,6 +317,27 @@ def subscribe opts = {}, &blk
self
end
+ # Removes the subscription from the queue and cancels the consumer.
+ # New messages will not be received by the queue. This call is similar
+ # in result to calling #unbind.
+ #
+ # Due to the asynchronous nature of the protocol, it is possible for
+ # "in flight" messages to be received after this call completes.
+ # Those messages will be serviced by the last block used in a
+ # #subscribe or #pop call.
+ #
+ # Additionally, if the queue was created with _autodelete_ set to
+ # true, the server will delete the queue after its wait period
+ # has expired unless the queue is bound to an active exchange.
+ #
+ # The method accepts a block which will be executed when the
+ # unsubscription request is acknowledged as complete by the server.
+ #
+ # * :nowait => true | false (default true)
+ # If set, the server will not respond to the method. The client should
+ # not wait for a reply method. If the server could not complete the
+ # method it will raise a channel or connection exception.
+ #
def unsubscribe opts = {}, &blk
@on_msg = nil
@on_cancel = blk
@@ -304,12 +350,27 @@ def unsubscribe opts = {}, &blk
def publish data, opts = {}
exchange.publish(data, opts)
end
+
+ # Boolean check to see if the current queue has already been subscribed
+ # to an exchange.
+ #
+ # Attempts to #subscribe multiple times to any exchange will raise an
+ # Exception. Only a single block at a time can be associated with any
+ # one queue for processing incoming messages.
+ #
+ def subscribed?
+ !!@on_msg
+ end
# Passes the message to the block passed to pop or subscribe.
#
# Performs an arity check on the block's parameters. If arity == 1,
# pass only the message body. If arity != 1, pass the headers and
# the body to the block.
+ #
+ # See AMQP::Protocol::Header for the hash properties available from
+ # the headers parameter. See #pop or #subscribe for a code example.
+ #
def receive headers, body
# XXX why is this here?
if AMQP.closing
View
10 lib/mq/rpc.rb
@@ -4,9 +4,9 @@ class MQ
# Needs more detail and explanation.
#
# EM.run do
- # server = MQ::RPC.new(MQ.new, 'hash table node', Hash)
+ # server = MQ.rpc('hash table node', Hash)
#
- # client = MQ::RPC.new(MQ.new, 'hash table node')
+ # client = MQ.rpc('hash table node')
# client[:now] = Time.now
# client[:one] = 1
#
@@ -74,13 +74,13 @@ def initialize mq, queue, obj = nil
end
end
- # Calling MQ::RPC.new(*args) returns a proxy object without any methods beyond
+ # Calling MQ.rpc(*args) returns a proxy object without any methods beyond
# those in Object. All calls to the proxy are handled by #method_missing which
# works to marshal and unmarshal all method calls and their arguments.
#
# EM.run do
- # server = MQ::RPC.new(MQ.new, 'hash table node', Hash)
- # client = MQ::RPC.new(MQ.new, 'hash table node')
+ # server = MQ.rpc('hash table node', Hash)
+ # client = MQ.rpc('hash table node')
#
# # calls #method_missing on #[] which marshals the method name and
# # arguments to publish them to the remote
Please sign in to comment.
Something went wrong with that request. Please try again.