Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

RabbitMQ External Exchange plugin

Fetching latest commit…

Octocat-spinner-32-eaf2f5

Cannot retrieve the latest commit at this time

Octocat-spinner-32 examples
Octocat-spinner-32 src
Octocat-spinner-32 .hgignore
Octocat-spinner-32 LICENSE-MPL-rabbitmq-external-exchange
Octocat-spinner-32 Makefile
Octocat-spinner-32 README
Octocat-spinner-32 package.mk
README
RabbitMQ External Exchange Plugin
=================================


WARNING
-------

This plugin is now unmaintained, and will probably not even compile
against recent versions of RabbitMQ. It is left here for historical
interest.


Overview
--------

This plugin allows you to write implementations of exchange types as
normal AMQP clients of RabbitMQ using any of the multitude of AMQP
clients available.

Example frameworks for both Java (using the RabbitMQ Java client) and
Erlang (using the RabbitMQ Erlang client) are provided to demonstrate
the cleanliness of API that can be achieved.


Introduction
------------

AMQP provides 4 basic exchange types: "fanout", "direct", "topic" and
"headers". It also provides a means to allow custom exchange types to
be used, requiring that any such custom exchange types start with a
"x-" prefix.

RabbitMQ provides pluggable exchange types, which permit new exchange
types to be provided by filling in a thin API. This API allows the
custom exchange type to be notified of exchange creation, deletion,
the addition and removal of bindings, and the publishing messages to
an exchange. The custom exchange may, upon being told of a publish,
modify the message or any of its meta-data, and route it to any queue.

However, the custom exchange is called directly by each channel, and
so does not serialise events, making it a little challenging (though
by no means overly tricky) to maintain state. It also requires the
exchange to be written in Erlang and to run directly in the same
Erlang VM as RabbitMQ (though note that plugins such as the
script-exchange allow for functions written in scripting languages
such as Javascript to be run on every publish).

This plugin takes the events relating to exchanges, encapsulates them
in AMQP and performs RPC to an external client. This allows further
choice of language, flexibility of implementation, and permits all
events to be processed by a single thread, thus making it easier for
the exchange type implementation to be stateful (though it's by no
means impossible to achieve a stateful exchange without this plugin).


Using the External Exchange
---------------------------

The plugin registers a new exchange type called "x-ee". You can then
create new exchanges with this type. The external exchange client is
then responsible for choosing which queues any message published to
the exchange are send the message.


Architecture
------------

The external exchange plugin creates a new "topic" exchange called
"external-exchange". To this exchange, it sends all events that happen
to any exchange with type "x-ee". The messages it sends always have
the routing-key of the message set to the name of the exchange to
which the event applies, and always have the reply_to field of the
basic class properties set. The value of this field should be used as
the routing_key of any message that the external exchange client needs
to send back. Such messages should be sent to the default (blank)
exchange.

In general it is recommended that the external exchange client
consumes from the "external-exchange" setting QoS prefetch to 1. The
messages received by the external exchange client should be explicitly
acknowledged.


Encapsulation Protocol
----------------------

AMQP includes a basic codec for custom headers in the class
properties. This is used extensively to send and receive values to and
from the external exchange client. The following specifies which
fields will be supplied, and what the expected response is, if any,
for all messages sent by the external exchange plugin to the
"external-exchange" topic exchange (and hence to the external exchange
client).

* On exchange creation:
Basic class properties headers:
 [{"action",      "create" :: long_string},
  {"durable",     IsDurable :: bool},
  {"auto_delete", IsAutoDelete :: bool},
  {"arguments",   Arguments :: table}]
Message body: none
The client must NOT respond to this message.
The fields "durable", "auto_delete" and "arguments" specify the values
supplied during exchange creation. The routing-key of the message
contains the exchange name.

* On exchange deletion:
Basic class properties headers:
 [{"action",      "create" :: long_string}]
Message body: none
The client must NOT respond to this message.
The routing-key of the message contains the exchange name.

* On binding
Basic class properties headers:
 [{"action",      "add_binding" :: long_string},
  {"queue_name",  QueueName :: long_string},
  {"key",         BindingKey :: long_string}]
Message body: none
The client must NOT respond to this message.
The fields "queue_name" and "key" specify the queue name and the
binding key specified in the creation of the binding. The routing-key
of the message contains the exchange name.

* On unbinding
Basic class properties headers:
 [{"action",      "remove_bindings" :: long_string},
  {"bindings",  [{{"queue_name", QueueName :: long_string},
                  {"key",        BindingKey :: long_string}} :: table] :: array}]
Message body: none
The client must NOT respond to this message.
The value of the field "binding" is an array, of which every element
is a table, which contains the fields "queue_name" and "key" which
specify the queue name and binding key of each binding that is being
removed. The routing-key of the message contains the exchange name.

* On publish
 [{"action",      "publish" :: long_string},
  {"routing_keys", RoutingKeys :: [long_string]}]
Message body: This is the body of the message being published
The client MUST respond to this message.
The field "routing_keys" indicates the routing keys of the message that
is being published. The routing-key of the message contains the
exchange name.
The response to must be published to the default exchange, with the
routing key set to the value of the "reply_to" field in the basic
class properties received with this message. The following field must
be set in the headers of the basic class properties of the message
sent back:
[{"queue_names", [QueueName :: long_string] :: array}]
The queue names field is an array of long_strings, each element being
the name of a queue to which the message should be routed. The
original message, with the original routing_keys is published to the
queue_names returned.


Example Frameworks
------------------

Two example frameworks are provided which abstract above the protocol
described above. One is in Java, and the other in Erlang. The Java
framework presents the following interface:

public interface ExchangeType {

        Set<String> publish(String exchangeName, byte[] body,
                        String[] routingKeys);

        void create(String exchangeName, boolean durable, boolean autoDelete,
                        Map<String, Object> arguments);

        void delete(String exchangeName);

        void addBinding(String exchangeName, Binding binding);

        void removeBindings(String exchangeName, Binding[] bindings);
}

The example also includes a demonstration of using this interface in
the ExampleExchangeType class, which boils down to implementing the
interface, extending the ExchangeTypeImpl, creating an instance and
running it:

        Connection conn = new ConnectionFactory().newConnection();
        String eName = "test-exchange";

        ExampleExchangeType eet = new ExampleExchangeType(conn, eName);
        new Thread(eet).start();

The Erlang interface has a very similar (if functional) behaviour
rabbit_external_exchange_type which has the following callbacks specs:

-type(exchange_name(), string()).
-type(binding_key(), string()).
-type(routing_key(), string()).
-type(queue_name(), string()).
-type(binding(), {binding_key(), queue_name()}).
-type(payload(), binary()).
-type(state(), any()).

-spec(init/1 :: ([any()]) -> state()).
-spec(publish/4 :: (exchange_name(), [routing_key()], payload(), state()) ->
                        {[queue_name()], state()}).
-spec(create/5 :: (exchange_name(), boolean(), boolean(), amqp_table(), state())
                  -> state()).
-spec(delete/2 :: (exchange_name(), state()) -> state()).
-spec(add_binding/3 :: (exchange_name(), binding(), state()) -> state()).
-spec(remove_bindings/3 :: (exchange_name(), [binding()], state()) -> state()).
-spec(terminate/2 :: (any(), state()) -> any()).

Using the API amounts to implementing the
rabbit_external_exchange_type behaviour in a module, and providing
that module to the rabbit_external_exchange_driver:

    XName = <<"test-exchange">>,
    {ok, Conn} = amqp_connection:start(network),
    Res = rabbit_external_exchange_driver:start_link(Conn, XName, Module, []),

The last parameter to rabbit_external_exchange_driver:start_link/4 is
a list of arguments which are passed through to the Module's init/1
function (very similar to gen_server's start_link and init/1
callback).

Of course, you can actually embed this framework as a plugin within
RabbitMQ and then use the direct connection from the Erlang client!

The reason for providing an exchange name to the constructors of both
framework APIs is that this is used as the binding key to the
"external-exchange" topic exchange. Of course, you may use the
catch-all key of "#" which will only permit you to use one external
exchange client. By setting this binding key more narrowly, you permit
different external exchange clients to be used with different
exchanges of the "x-ee" type.

Both of these frameworks can be used as is, but should also serve as
demonstration implementations (and thus further documentation) of the
client portion of the above protocol encapsulation. Thus other client
implementations in other languages for which AMQP client libraries
exist should be reasonably easy to create.
Something went wrong with that request. Please try again.