Skip to content

Client API

sskhiri edited this page Sep 27, 2012 · 19 revisions

Creating a Queue

The API to create or remove a queue is exposed through the logical Queue Factory, that just needs tha address of the global configurations server to starts. It maintains a cache of the RoQ cluster topology.

String configServerAddress = "192.23.25.25";
LogicalQFactory factory = new LogicalQFactory(configServerAddress);
factory.createQueue(qName, host);

In the same way, to remove a queue just call.

factory.removeQueue("queue1");

Publisher API

The client API is at the boundaries of RoQ and let client caller send messages to queues.

//1. Creating the connection
IRoQConnectionFactory factory = new RoQConnectionFactory(configServerAddress);
IRoQConnection connection = factory.createRoQConnection("queue1");
connection.open();
//2. Creating the publisher and sending message
IRoQPublisher publisher = connection.createPublisher();
// Wait for the connection is established before sending the first message
connection.blockTillReady(10000);
publisher.sendMessage("sabri".getBytes(), "hello".getBytes()

Publisher API architecture

The architecture is shown by the Figure below.
The Publisher API architecture.

The RoQPublisherConnection manages the life cycle of the PublisherConnectionManager thread. This thread is responsible for targeting the right Exchange, it can be notified through the management channel by the monitor of the queue in order to re-allocate the publisher to another Exchange. This re-allocation can be caused by the Exchange failure or by the fact the Exchange is overloaded.

The close method on the RoQConnection will stop the PublisherConnectionManager thread.

The RoQConnectionFactory is a factory that is responsible for create a RoQConnection instance. The factory needs the address of the global configuration server in order to ask for getting the right monitor associated with the logical queue name that the publisher must bind. Then, when creating the connection the factory asks to the configuration manager for the monitor host to address the re-allocation request. As a result the PublisherConnectionManager thread can directly send request to the right monitor thread.

# Subscriber API In the same way, the subscriber API needs a connection. This object will manage any topological changes of the logical queue. As a result, if an exchange is added or removed, the connection thread will re-configure the underlying connections. Exactly as the publisher connection, the subscriber connection must be opened and closed when the client caller finishes to send messages.
//1. Create with the factory, create the connection
String configServerAddress = "192.23.25.25";
IRoQConnectionFactory factory= new RoQonnectionFactory(configServerAddress );
//Create a Q connection on the logical queue name queue1 with the filter key sabri.
IRoQSubscriberConnection subConnection = factory.createRoQSubscriberConnection("queue1","sabri");
//2. Open the connection to the logical queue
subConnection.open();
//3. Register a message listener
subConnection.setMessageSubscriber(new IRoQSubscriber() {
     public void onEvent(byte[] msg) {
     String content= new String(msg,0,msg.length) ;
     assert content.equals("hello");
       logger.info("In message lIstener recieveing :"+ content);
     }
});

Statistic subscription

For any non-JAVA process which needs to subscribe to KPI sent by the Statistic Monitor server, a broadcasting statistic channel is available. The only task to write is to subscribe to the Stat monitor publishing channel. To get this channel the client can ask directly to the Global configuration management.

// 1. Get the location in BSON
// 1.1 Create the request socket
ZMQ.Socket globalConfigReq = context.socket(ZMQ.REQ);
globalConfigReq.connect("tcp://" + this.configurationServer + ":5000");

// 1.2 Send the request
// Prepare the request BSON object
BSONObject request = new BasicBSONObject();
request.put("CMD", RoQConstant.BSON_CONFIG_GET_HOST_BY_QNAME);
request.put("QName", qName);
//Send 
globalConfigReq.send(BSON.encode(request), 0);
byte[] configuration = globalConfigReq.recv(0);

//1.3 decode the host address
BSONObject dConfiguration = BSON.decode(configuration);
String monitorStatServer = (String) dConfiguration.get(RoQConstant.BSON_STAT_MONITOR_HOST);
Assert.assertNotNull(monitorStatServer);

// 2. Register a socket to the stat monitor
kpiSocket = context.socket(ZMQ.SUB);
kpiSocket.connect(monitorStatServer);
kpiSocket.subscribe("".getBytes());

Notice that if you are in JAVA, you can still use the JAVA serialisation for encoding the global configuration request as:

globalConfigReq.send((RoQConstant.BSON_CONFIG_GET_HOST_BY_QNAME + "," + qName).getBytes(), 0);

Format of Statistic sent

Once we have subscribe to statistic from the monitor stat of the Queue we only receive information about that queue. From exchange, we receive the following information as a JSON format a muli-part message composed by 3 messages:

    * { "CMD" : 20 , "X_ID" : "XChange 1340470515568"}
    * { "CMD" : 21 , "Minute" : "1" , "TotalProcessed" : "500" , "Processed" : "500" , "TotalThroughput" : "0" , "Throughput" : "3890" , "Producers" : "1"}
    * { "CMD" : 22 , "CPU" : "1.12" , "MEMORY" : "4.6083984375"}

In addition we can receive information at the Logical Queue level:

{ "CMD" : 23 , "QName" : "queueTestStat:5500" , "XChanges" : "1" , "Producers" : "0" , "Throughput" : "0"}

From the subscriber we get:

{
 "CMD" : 31 ,
 "Minute" : "0" ,
 "TotalReceived" : "1000" ,
 "Received" : "0" ,
 "SubsriberID" : "b1a92da6-d9c6-4231-b1b4-70f62590822d" ,
 "MeanLat" : "0"}

#Subscribing to the management controller The management controller broadcast every minute the configuration of the RoQ cluster. Any client can bind the broadcast port on the :5004 and start receiving the information in JSON.

// 1. ZMQ Init
this.context = ZMQ.context(1);
// 2. Register a socket to the stat monitor
mngtSubSocket = context.socket(ZMQ.SUB);
mngtSubSocket.connect("tcp://"+globalMgntAddress+":5004");
mngtSubSocket.subscribe("".getBytes());

Then the information we get follow this format:

{ "CMD" : 1500}
{ 
  "Queues" :
    [ { "Name" : "queue1" , "Host" : "172.23.107.160" , "State" : false} , 
      { "Name" : "queueTest" , "Host" : "172.23.107.160" , "State" : true} ,
      { "Name" : "queue2" , "Host" : "172.23.107.160" , "State" : true}
    ]
}
{ "Hosts" : [ "172.23.107.160"]}

Management server BSON API

In order to let 3rd parties management client managing a RoQ cluster, we expose BSON API on the port 5003 of the global configuration manager. ###Removing a Queue The remove code request is 2001, we need to specify the queue name.

{ "CMD" : 2001 , "QName" : "myName"}

The Result content the result and comments. The result code 0 = SUCCESS, 1101 = FAIL.

{ "RESULT" : 1101 , "COMMENT" : "The queue does not exist"}

###Stopping a Queue The stop code request is 2002, we need to specify the queue name.

{ "CMD" : 2002 , "QName" : "myName"}

The Result content the result and comments. The result code 0 = SUCCESS, 1101 = FAIL.

{ "RESULT" : 1101 , "COMMENT" : "The queue does not exist"}

###Creating a Queue The create code request is 2003, we need to specify the queue name.

{ "CMD" : 2003 , "QName" : "myName", "Host" : "168.0.1.1"}

The Result content the result and comments. The result code 0 = SUCCESS, 1101 = FAIL.

{ "RESULT" : 0 , "COMMENT" : ""}

###Starting a Queue The Start code request is 2005, we need to specify the queue name. If the queue exist and is present in the management server DB, the management controller will retrieve his host and will send the start command to the local host manager running on that machine. Notice that you can only start a queue that has been stopped. This means that it is still registered at the management server but its state is set at "false".

{ "CMD" : 2005 , "QName" : "myName"}

The Result content the result and comments. The result code 0 = SUCCESS, 1101 = FAIL.

{ "RESULT" : 0 , "COMMENT" : "The queue does not exist"}

Clone this wiki locally