Skip to content

DoveMQ as AMQP transport

Tejeswar Das edited this page Jun 9, 2013 · 20 revisions

DoveMQ as AMQP transport

Introduction

DoveMQ is an AMQP 1.0 based messaging framework.

DoveMQ implements an AMQP message broker and has APIs to support message queuing and pub-sub.

DoveMQ also provides APIs so that it can be used as an AMQP peer-to-peer transport, without the involvement of a Broker. Messages are sent between two peers over an AMQP link. This enables implementing various routing/bridge patterns. A DoveMQ endpoint could, for example, receive a message, transform it, and forward it to other AMQP endpoints. Another DoveMQ endpoint could, for example, implement an AMQP/XMPP bridge.

For the purpose of establishing an AMQP session, one endpoint acts as a listener, to accept AMQP connections. Another endpoint, acting as a client, initiates an AMQP connection. After the AMQP session is established, there is no distinction between the client, and listener, i.e, they become peers and either endpoint can create a DoveMQ channel (that encapsulates an AMQP link) on the underlying (bidirectional) AMQP session. For any particular Channel, one peer becomes a Sender and another peer becomes a Receiver.

DoveMQ peer-to-peer API

See a very basic example that gives a flavor of the API.

DoveMQ Channel

A DoveMQ Channel is a uni-directional pipe that encapsulates an AMQP Link. One endpoint sends messages over the channel, by calling Channel.send(). The message is asynchronously received by the peer via DoveMQMessageReceiver that's registered with the Channel.

Implement a Channel Endpoint Listener

Each endpoint can play a dual role of Sender and Receiver. For some links, it acts as Sender, for other links, it acts as Receiver. As a best practice, it should implement ChannelEndpointListener interface. It lets the endpoint to be notified when a Channel is created by a peer that acts as a Channel sender. An instance of ChannelEndpoint is passed in the ChannelEndpointListener.channelCreated() that represents an AMQP link receiver.

Initialize runtime for listener endpoint

To initialize the DoveMQ runtime for a listener endpoint, provide an endpoint name, a TCP listen port, and an implementation of ChannelEndpointListener.

ConnectionFactory.initializeEndpoint("listenerEndpoint", listenPort, new SampleEndpointListener());

Initialize runtime for connecting endpoint

To initialize the DoveMQ runtime for a connector endpoint, provide an endpoint name, and (optionally) an implementation of ChannelEndpointListener (if the connector endpoint also wants to act as a Channel receiver).

ConnectionFactory.initializeClientEndpoint("connector", new SampleEndpointListener());

Create an AMQP session

Create an AMQP connection to the peer listener endpoint, by providing the peer's IP address and TCP listen port. Then, create an AMQP session over the connection.

Connection amqpConnection = ConnectionFactory.createConnectionToAMQPEndpoint(doveMQListenerIP, connectPort);
Session session = amqpConnection.createSession();

Create a DoveMQ Channel

Once the session is created, the sender creates a DoveMQ Channel (that encapsulates an AMQP link), providing a Channel endpoint name.

Channel sender = session.createChannel("SampleChannelEndpoint");

An instance of Channel is created, that represents an AMQP link sender.

When the Channel is established, the peer gets a notification of Channel creation via ChannelEndpointListener.channelCreated().

private static class SampleChannelEndpointListener implements ChannelEndpointListener {
    @Override
    public void channelCreated(ChannelEndpoint channelEndpoint) {
        channelEndpoint.registerMessageReceiver(messageReceiver);
    }
}

The ChannelEndpoint represents a Link Receiver. In ChannelEndpointListener.channelCreated(), the endpoint could register an implementation of DoveMQMessageReceiver with the ChannelEndpoint to receive messages asynchronously over the link.

ChannelEndpoint.getSession() returns the underlying AMQP session that the link was created on. This enables the endpoint to create another link over the underlying session, if needed (for request-response style interaction).

Messaging

First, implement DoveMQMessageReceiver interface.

private static class SampleMessageReceiver implements DoveMQMessageReceiver {
    @Override
    public void messageReceived(DoveMQMessage message) {
        byte[] body = message.getPayload();
        String payload = new String(body);
        System.out.println("Received message: " + payload);
    }
}

Register the message receiver with ChannelEndpoint as soon as a new link is established.

channelEndpoint.registerMessageReceiver(new SampleMessageReceiver());

Now, the ChannelEndpoint is ready to receive messages.

Send message

Create and send a message

DoveMQMessage message = MessageFactory.createMessage();
String msg = "Hello world";
message.addPayload(msg.getBytes());
sender.sendMessage(message);

Receive message

Upon message receipt, DoveMQMessageReceiver..messageReceived() is called.

Cleanup resources

Close a session

session.close();

Close a connection

connection.close();

Cleanup runtime

Cleanup the DoveMQ runtime as follows. Any open sessions and connections will be closed.

ConnectionFactory.shutdown();