Browse files

update to latest DSB WSN and monitoring

  • Loading branch information...
1 parent b2964f0 commit ce11fed366e8fd88ebb5f71210b2acb1bafb6e1d @chamerling chamerling committed Nov 15, 2012
View
4 play-dsb-wsn/src/main/java/org/ow2/play/dsb/wsn/component/Component.java
@@ -185,7 +185,7 @@ protected EventGovernance getEventGovernance(String reg)
EventGovernance.class);
}
- protected synchronized MonitoringService getMonitoringService() {
+ public synchronized MonitoringService getMonitoringService() {
if (monitoringService == null) {
try {
monitoringService = new JSONMonitoringService(
@@ -210,7 +210,7 @@ private String getMonitoringEndpoint() throws RegistryException {
.getClientFromFinalURL(reg, Registry.class);
String monitoring = registry.get(Constants.MONITORING_DSB_WSN);
if (monitoring == null) {
- result = "http://localhost:3000/monitoring/dsb/wsn/";
+ result = "http://localhost:3000/api/v1/monitoring/wsn/";
} else {
result = monitoring;
}
View
84 play-dsb-wsn/src/main/java/org/ow2/play/dsb/wsn/component/JSONMonitoringService.java
@@ -22,6 +22,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.jws.WebMethod;
+
import org.petalslink.dsb.jbi.se.wsn.api.MonitoringService;
import org.petalslink.dsb.jbi.se.wsn.api.Topic;
import org.petalslink.dsb.jbi.se.wsn.api.WSNException;
@@ -64,7 +66,7 @@ public void newInNotifyError(final String uuid, Document payload,
logger.fine("Error while delivering notification to " + to);
}
- MonitoringInfo info = new MonitoringInfo();
+ MonitoringNotificationInfo info = new MonitoringNotificationInfo();
info.uuid = uuid;
info.error = e.getMessage();
info.timestamp = timestamp;
@@ -80,7 +82,7 @@ public void newInNotifyInput(String uuid, Document payload, Topic topic,
logger.fine("New newInNotifyInput on topic " + topic);
}
- MonitoringInfo info = new MonitoringInfo();
+ MonitoringNotificationInfo info = new MonitoringNotificationInfo();
info.uuid = uuid;
info.timestamp = timestamp;
info.topic = topic;
@@ -96,7 +98,7 @@ public void newInNotifyOutput(String uuid, Document payload, Topic topic,
logger.fine("New newInNotifyOutput on topic " + topic);
}
- MonitoringInfo info = new MonitoringInfo();
+ MonitoringNotificationInfo info = new MonitoringNotificationInfo();
info.uuid = uuid;
info.timestamp = timestamp;
info.topic = topic;
@@ -112,7 +114,7 @@ public void newOutNotify(String uuid, Document payload, String to,
+ to);
}
- MonitoringInfo info = new MonitoringInfo();
+ MonitoringNotificationInfo info = new MonitoringNotificationInfo();
info.uuid = uuid;
info.timestamp = timestamp;
info.topic = topic;
@@ -129,7 +131,7 @@ public void newOutNotifyError(String uuid, Document paylaod, String to,
+ " while sending to " + to);
}
- MonitoringInfo info = new MonitoringInfo();
+ MonitoringNotificationInfo info = new MonitoringNotificationInfo();
info.uuid = uuid;
info.timestamp = timestamp;
info.topic = topic;
@@ -138,8 +140,75 @@ public void newOutNotifyError(String uuid, Document paylaod, String to,
info.error = e.getMessage();
post(info);
}
+
+ @Override
+ public void newSubscribeRequest(String uuid, String subscriber, Topic topic) throws WSNException {
+ MonitoringSubscriptionInfo info = new MonitoringSubscriptionInfo();
+ info.uuid = uuid;
+ info.subscriber = subscriber;
+ info.topic = topic;
+ info.type = "newSubscribeRequest";
+ info.timestamp = System.currentTimeMillis();
+ post(info);
+ }
+
+ @Override
+ public void newSubscribeResponse(String uuid, String subscriptionID) throws WSNException {
+ MonitoringSubscriptionInfo info = new MonitoringSubscriptionInfo();
+ info.uuid = uuid;
+ info.subscriptionID = subscriptionID;
+ info.type = "newSubscribeResponse";
+ info.timestamp = System.currentTimeMillis();
+ post(info);
+ }
+
+ @Override
+ public void newUnsubscribeRequest(String uuid, String subscriptionID) throws WSNException {
+ MonitoringSubscriptionInfo info = new MonitoringSubscriptionInfo();
+ info.uuid = uuid;
+ info.subscriptionID = subscriptionID;
+ info.type = "newUnsubscribeRequest";
+ info.timestamp = System.currentTimeMillis();
+ post(info);
+ }
+
+ @Override
+ public void newUnsubscribeResponse(String uuid) throws WSNException {
+ MonitoringSubscriptionInfo info = new MonitoringSubscriptionInfo();
+ info.uuid = uuid;
+ info.type = "newUnsubscribeResponse";
+ info.timestamp = System.currentTimeMillis();
+ post(info);
+ }
+
+ protected void post(final MonitoringNotificationInfo info) {
+ try {
+ getAsyncHttpClient().preparePost(endpoint)
+ .addHeader("Content-Type", "application/json")
+ .setBody(gson.toJson(info))
+ .execute(new AsyncCompletionHandler<Response>() {
+
+ @Override
+ public Response onCompleted(Response response)
+ throws Exception {
+ logger.fine("Request is complete for UUID "
+ + info.uuid);
+ return response;
+ }
- protected void post(final MonitoringInfo info) {
+ @Override
+ public void onThrowable(Throwable t) {
+ logger.warning("HTTP failure for UUID '"
+ + info.uuid + "' : " + t.getMessage());
+ }
+ });
+ } catch (Exception ex) {ex.printStackTrace();
+ logger.warning("Error while sending monitoring information : "
+ + ex.getMessage());
+ }
+ }
+
+ protected void post(final MonitoringSubscriptionInfo info) {
try {
getAsyncHttpClient().preparePost(endpoint)
.addHeader("Content-Type", "application/json")
@@ -160,7 +229,7 @@ public void onThrowable(Throwable t) {
+ info.uuid + "' : " + t.getMessage());
}
});
- } catch (Exception ex) {
+ } catch (Exception ex) {ex.printStackTrace();
logger.warning("Error while sending monitoring information : "
+ ex.getMessage());
}
@@ -173,5 +242,4 @@ private synchronized AsyncHttpClient getAsyncHttpClient() {
}
return client;
}
-
}
View
2 ...lay/dsb/wsn/component/MonitoringInfo.java → ...component/MonitoringNotificationInfo.java
@@ -25,7 +25,7 @@
* @author chamerling
*
*/
-public class MonitoringInfo {
+public class MonitoringNotificationInfo {
/**
* Correlation UUID
View
65 play-dsb-wsn/src/main/java/org/ow2/play/dsb/wsn/component/MonitoringSubscriptionInfo.java
@@ -0,0 +1,65 @@
+/**
+ *
+ * Copyright (c) 2012, PetalsLink
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ */
+package org.ow2.play.dsb.wsn.component;
+
+import org.petalslink.dsb.jbi.se.wsn.api.Topic;
+
+/**
+ * @author chamerling
+ *
+ */
+public class MonitoringSubscriptionInfo {
+
+ /**
+ * Correlation UUID
+ */
+ public String uuid;
+
+ /**
+ * The message type
+ */
+ public String type;
+
+ /**
+ *
+ */
+ public Topic topic;
+
+ /**
+ * server timestamp
+ */
+ public long timestamp;
+
+ /**
+ * Target endpoint
+ */
+ public String to;
+
+ /**
+ * Source endpoint
+ */
+ public String subscriber;
+
+ /**
+ * Subscription ID
+ */
+ public String subscriptionID;
+
+}
View
75 play-dsb-wsn/src/test/java/org/ow2/play/dsb/wsn/component/JSONMonitoringServiceTest.java
@@ -20,6 +20,7 @@
package org.ow2.play.dsb.wsn.component;
import java.io.IOException;
+import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -62,54 +63,63 @@ protected void tearDown() throws Exception {
super.tearDown();
}
- public void testJSON() {
-
-
+ public void testJSON() throws WSNException {
+
final AtomicLong counter = new AtomicLong(0);
final CountDownLatch latch = new CountDownLatch(100);
- /*
- Server server = new Server(8038);
- server.setHandler(new AbstractHandler() {
-
- @Override
- public void handle(String target, Request baseRequest,
- HttpServletRequest request, HttpServletResponse response)
- throws IOException, ServletException {
- System.out.println("Got request : "
- + IOUtils.toString(request.getInputStream()));
- // counter.incrementAndGet();
- latch.countDown();
- }
- });
- try {
- server.start();
- } catch (Exception e) {
- fail();
- }
- */
-
JSONMonitoringService service = new JSONMonitoringService(
- "http://localhost:3000/monitoring/dsb/wsn/");
+ "http://localhost:3000/api/v1/monitoring/wsn/");
Topic t = new Topic();
t.name = "TestTopic";
t.ns = "http://play.ow2.org";
t.prefix = "play";
long start = System.currentTimeMillis();
+
+ service.newInNotifyInput(UUID.randomUUID().toString(), null, t,
+ System.currentTimeMillis());
+
+ Random r = new Random();
+ int j = 0;
+
for (int i = 0; i < 10000; i++) {
+ if (j++ == 10) {
+ j = 0;
+ }
try {
+
long a = System.currentTimeMillis();
+ service.newInNotifyError(UUID.randomUUID().toString(), null,
+ "toin", t, System.currentTimeMillis(), new Exception(
+ "OHOOH"));
+
+ try {
+ Thread.sleep(r.nextInt(500));
+ } catch (InterruptedException e) {
+ }
+
+ t.name = "TestTopic" + j;
+ service.newOutNotifyError(UUID.randomUUID().toString(), null,
+ "http://localhost:" + j, t, System.currentTimeMillis(),
+ new Exception("AHAHA"));
+ try {
+ Thread.sleep(r.nextInt(500));
+ } catch (InterruptedException e) {
+ }
service.newInNotifyInput(UUID.randomUUID().toString(), null, t,
System.currentTimeMillis());
- System.out.println("Sent in " + (System.currentTimeMillis() - a));
+ service.newInNotifyOutput(UUID.randomUUID().toString(), null,
+ t, System.currentTimeMillis());
+
+ try {
+ Thread.sleep(r.nextInt(500));
+ } catch (InterruptedException e) {
+ }
+ System.out.println(i + " : Sent in "
+ + (System.currentTimeMillis() - a));
+
} catch (WSNException e) {
}
- try {
- Thread.sleep(133);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
}
System.out.println("SENT in " + (System.currentTimeMillis() - start));
@@ -119,6 +129,7 @@ public void handle(String target, Request baseRequest,
e.printStackTrace();
fail();
}
+
System.out.println("Done in " + (System.currentTimeMillis() - start));
}

0 comments on commit ce11fed

Please sign in to comment.