-
Notifications
You must be signed in to change notification settings - Fork 4
Client API
The API to create or remove a queue is exposed through the logical Queue Factory, that just needs tha address of the global configurations server to starts. It maintains a cache of the RoQ cluster topology.
String configServerAddress = "192.23.25.25";
LogicalQFactory factory = new LogicalQFactory(configServerAddress);
factory.createQueue(qName, host);In the same way, to remove a queue just call.
factory.removeQueue("queue1");The client API is at the boundaries of RoQ and let client caller send messages to queues.
//1. Creating the connection
IRoQConnectionFactory factory = new RoQConnectionFactory(configServerAddress);
IRoQConnection connection = factory.createRoQConnection("queue1");
connection.open();
//2. Creating the publisher and sending message
IRoQPublisher publisher = connection.createPublisher();
// Wait for the connection is established before sending the first message
connection.blockTillReady(10000);
publisher.sendMessage("sabri".getBytes(), "hello".getBytes()The architecture is shown by the Figure below.
The RoQPublisherConnection manages the life cycle of the PublisherConnectionManager thread. This thread is responsible for targeting the right Exchange, it can be notified through the management channel by the monitor of the queue in order to re-allocate the publisher to another Exchange. This re-allocation can be caused by the Exchange failure or by the fact the Exchange is overloaded.
The close method on the RoQConnection will stop the PublisherConnectionManager thread.
The RoQConnectionFactory is a factory that is responsible for create a RoQConnection instance. The factory needs the address of the global configuration server in order to ask for getting the right monitor associated with the logical queue name that the publisher must bind. Then, when creating the connection the factory asks to the configuration manager for the monitor host to address the re-allocation request. As a result the PublisherConnectionManager thread can directly send request to the right monitor thread.
# Subscriber API In the same way, the subscriber API needs a connection. This object will manage any topological changes of the logical queue. As a result, if an exchange is added or removed, the connection thread will re-configure the underlying connections. Exactly as the publisher connection, the subscriber connection must be opened and closed when the client caller finishes to send messages.//1. Create with the factory, create the connection
String configServerAddress = "192.23.25.25";
IRoQConnectionFactory factory= new RoQonnectionFactory(configServerAddress );
//Create a Q connection on the logical queue name queue1 with the filter key sabri.
IRoQSubscriberConnection subConnection = factory.createRoQSubscriberConnection("queue1","sabri");
//2. Open the connection to the logical queue
subConnection.open();
//3. Register a message listener
subConnection.setMessageSubscriber(new IRoQSubscriber() {
public void onEvent(byte[] msg) {
String content= new String(msg,0,msg.length) ;
assert content.equals("hello");
logger.info("In message lIstener recieveing :"+ content);
}
});