Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
RabbitMQ Shovel plugin
Erlang Makefile
Tag: rabbitmq_v2_8_1

Fetching latest commit…

Cannot retrieve the latest commit at this time

Failed to load latest commit information.
include
src
test/src
.hgignore
Makefile
README
generate_deps
package.mk

README

RabbitMQ-shovel
===============


Introduction
------------

This is a plug-in for RabbitMQ that shovels messages from a queue on
one broker to an exchange on another broker. The two brokers may be
the same. The plug-in allows several shovels to be specified at the
same time. Each shovel may have a number of source and destination
brokers specified, and one of each is chosen whenever the shovel
attempts to make a connection: this permits simple round-rabbit load
balancing.

Resources can be declared upon connection to both the source and
destination brokers, and parameters can be specified for both the
reception and publishing of messages.


Requirements
------------

You can build and install it like any other plugin (see
http://www.rabbitmq.com/plugin-development.html).


Configuration
-------------

The RabbitMQ configuration file specifies the shovel
configurations. This exists by default, in
/etc/rabbitmq/rabbitmq.config under Linux systems,
%RABBITMQ_BASE%\rabbitmq.config under Windows or somewhere else under
OS X. This file configures both RabbitMQ-server and all the plugins
installed in it. It is an Erlang-syntax file of the form:

[{section1, [section1-config]},
 {section2, [section2-config]},
 ...
 {sectionN, [sectionN-config]}
].

thus a list of tuples, where the left element of each tuple names the
applications being configured. Don't forget the last element of the
list doesn't have a trailing comma, and don't forget the full-stop is
needed after closing the list. Hence if you configure RabbitMQ-server
and the RabbitMQ-shovel, then the configuration file may have a
structure like this:

[{rabbit,        [configuration-for-RabbitMQ-server]},
 {rabbit-shovel, [configuration-for-RabbitMQ-shovel]}
].

A full example of the shovel configuration is:

{rabbitmq_shovel,
  [{shovels,
    [{my_first_shovel,
      [{sources,      [{brokers,
                          ["amqp://fred:secret@host1.domain/my_vhost",
                           "amqp://john:secret@host2.domain/my_vhost"
                          ]},
                       {declarations,
                          ['queue.declare',
                           {'queue.bind',
                                  [{exchange, <<"my_exchange">>},
                                   {queue,    <<>>}]}
                          ]}]},
       {destinations, [{broker, "amqp://"},
                       {declarations,
                          [{'exchange.declare',
                                  [{exchange, <<"my_exchange">>},
                                   {type, <<"direct">>},
                                   durable]}
                          ]}]},
       {queue, <<>>},
       {prefetch_count, 10},
       {ack_mode, on_confirm},
       {publish_properties, [{delivery_mode, 2}]},
       {publish_fields, [{exchange, <<"my_exchange">>},
                         {routing_key, <<"from_shovel">>}]},
       {reconnect_delay, 5}
      ]}
     ]
   }]
}

Firstly, all shovels are named. Here we have one shovel, called
'my_first_shovel'. We can have multiple shovels if you wish. Secondly,
every shovel must have the subfields sources, destinations and queue
specified, whereas the other fields (prefetch_count, ack_mode,
publish_properties, publish_fields, reconnect_delay) are optional and
have defaults as follows:

{prefetch_count,      0}
{ack_mode,            on_confirm}
{publish_properties,  []}
{publish_fields,      []}
{reconnect_delay,     5}


Sources and Destinations
------------------------

Sources and destinations specify respectively where messages are
fetched from and delivered too. One of 'broker' and 'brokers' must be
specified, and 'broker' is simply shorthand for when only one broker
needs specifying. Using 'brokers' allows a list of brokers to be
specified: whenever the connection to a broker is lost, another one is
chosen at random from the list and a connection attempt is made to
that. The syntax for broker URIs is

amqp://username:password@host:port/vhost?key1=value1&key2=value2...

and is based on the AMQP URI scheme specified at
http://www.rabbitmq.com/uri-spec.html. If username or password are
omitted, the default values of guest and guest are used. If the vhost
is omitted, the default value of / is used. If the host is omitted,
then the plugin uses the "direct" connection internally rather than a
network connection: this means it connects to the RabbitMQ-server node
on which it is running without going through the network stack. This
is much more efficient. If port is omitted then the default value is
used (5672 or 5671 if SSL is used).

SSL is implemented, for which additional parameters are needed:

amqps://username:password@host:port/vhost?cacertfile=/path/to/cacert.pem&certfile=/path/to/certfile.pem&keyfile=/path/to/keyfile.pem&verify=verifyOption&fail_if_no_peer_cert=failOption

All five parameters (3 paths: cacertfile, certfile and keyfile; 2
options: verify, fail_if_no_peer_cert) must be specified. See the SSL
guide at http://www.rabbitmq.com/ssl.html#configure-erlang for details
of SSL in RabbitMQ in general and specifically for the Erlang client
(on which the shovel is built).

Note that SSL cannot be used with the direct connection (i.e. a host
must be specified when using SSL), and that it is preferable to use
the non-SSL direct connection when connecting to the same node that's
running the shovel.

The query part of the URI permits the configuration of additional
connection parameters, permitting heartbeat, channel_max, and
frame_max to be specified. These can be given in any order, and
omitted fields assume default values. For example:

amqp://myhost?heartbeat=5&frame_max=8192

Specifies a non-encrypted network connection to the host 'myhost',
using default username, password, port, vhost and channel_max, but
specifying the heartbeat interval of 5 seconds, and the maximum frame
size of 8192 bytes.


Resource Declarations
---------------------

Both sources and destinations can have an optional 'declarations'
clause. The value of this is a list, consisting of AMQP Methods. If
default values are sufficient, then the method name alone can be
specified - e.g. 'queue.declare'. If parameters need to be set then
the method should be given as a tuple, with the right hand side a
proplist specifying which fields need altering from their default
values. E.g:
    {'exchange.declare',[{exchange, <<"my_exchange">>},
                         {type, <<"direct">>},
                         durable]},

One very useful feature here is the Most-Recently-Declared-Queue
feature, in which RabbitMQ remembers the name of the most recently
declared queue. This means that you can declare a private queue, and
then bind it to exchanges without ever needing to know its name.


queue :: binary
---------------

This parameter specifies the name of the queue on the source brokers
to consume from. This queue must exist. Use the resource declarations
to create the queue (or ensure it exists) first. Note again that the
Most-Recently-Declared-Queue feature can be used here, thus an
anonymous queue can be used: use <<>> to indicate the
Most-Recently-Declared-Queue.


prefetch_count :: non-negative-integer
--------------------------------------

The shovel consumes from a queue. This parameter imposes a limit on
the number of messages which are sent to the shovel in advance of the
message the shovel is currently processing.


ack_mode :: 'no_ack' | 'on_publish' | 'on_confirm'
--------------------------------------------------

This setting controls if or when acknowledgements to the source broker
are issued by the shovel. The default is 'on_confirm'.

'no_ack' - The shovel consumes from its queue with no_ack = 'true':
i.e. the shovel does not issue explicit acks for messages it receives,
and the source broker considers messages acknowledged as soon as it
has sent them to the shovel.

'on_publish' - the shovel consumes from its queue with no_ack =
'false', and a message is acknowledged to the source broker as soon as
it has been published to the destination broker.

'on_confirm' - the shovel consumes from its queue with no_ack =
'false' and it requests publisher confirmations from the destination
broker. It only issues acknowledgements to the source broker when it
has received confirmation from the destination broker that each
message has been successfully received. This setting gives a guarantee
that messages will not be discarded from the source broker until after
the destination broker has confirmed receipt of the message.

'on_confirm' is strongly recommended. However the destination broker
must support publish confirms, meaning it can only be used with
RabbitMQ 2.3.1 and up.

publish_properties
------------------

This is a list of tuples which override fields in the basic class
properties when publishing to the destination. This can be used to
override any of the following fields: content_type, content_encoding,
headers, delivery_mode, priority, correlation_id, reply_to,
expiration, message_id, timestamp, type, user_id, app_id, cluster_id.

By default, all the properties of the basic class that are received
with the delivery are passed through to the destination, but this
field can be used to override them.


publish_fields
--------------

This is a list of tuples which override fields in the publish method
when publishing to the destination. This can be used to direct
messages to a particular exchange on the destination, for example, or
change the routing key. By default, the exchange and the routing key
of the message as it is received by the shovel is passed through, but
this can be overridden as necessary.


reconnect_delay :: non-negative-number
--------------------------------------

When an error occurs, the shovel will disconnect from both the source
and destination broker immediately. This will force uncommitted
transactions at the destination to be rolled back, and delivered but
unacknowledged messages from the source to be requeued. The shovel
will then try connecting again. If this is unsuccessful, then it's not
a good idea for the shovel to very quickly and repeatedly try to
reconnect. The value specified here is determines the delay, in
seconds, between each connection attempt.

Note that if set to 0, the shovel will never try to reconnect: it'll
stop after the first error.

Also, the value can be a floating point number, thus permitting the
specification of delays at a granularity smaller than whole seconds.


Obtaining shovel statuses
-------------------------

Invoke:

$ rabbitmqctl eval 'rabbit_shovel_status:status().'

This will return a list, with one row for each configured shovel. Each
row has three fields: the shovel name, the shovel status, and the
timestamp (a local calendar time of {{YYYY,MM,DD},{HH,MM,SS}}). There
are 3 possible statuses:

'starting': The shovel is starting up, connecting and creating
    resources.

{'running'|'blocked', {'source', Source},
                      {'destination', Destination}}:

    When 'running', the shovel is up and running, shovelling
    messages. When 'blocked', the destination has raised channel.flow,
    preventing the shovel from sending messages to the
    destination. The shovel will raise channel.flow to the source,
    asking the source to stop sending further messages to the
    shovel. Any messages that are received by the shovel before the
    source observes the channel.flow are correctly buffered and
    maintained in order, and are published to the destination as soon
    as the destination drops the channel.flow block.

    Source and Destination give the connection parameters used to
    connect to the endpoints.

{'terminated', Reason}: Something's gone wrong. The Reason should give
    a further indication of where the fault lies.
Something went wrong with that request. Please try again.