Permalink
Browse files

Merge pull request #241 from Narigo/eventbusbridge-with-extra-hooks

Ability to create a listener for events on the EventBusBridge
  • Loading branch information...
2 parents 1d08f97 + c69c839 commit c0976fc7f8080506b3d739d55d031bfdad54b332 @purplefox purplefox committed Aug 21, 2012
View
@@ -125,6 +125,9 @@ task assemble(type: Copy, dependsOn: ['preflight', subprojects.assemble]) {
into('examples/ruby/eventbusbridge') {
from file("src/dist/client/vertxbus.js")
}
+ into('examples/javascript/eventbusbridgelistener') {
+ from file("src/dist/client/vertxbus.js")
+ }
into('examples/groovy/webapp/web/js') {
from file("src/dist/client/vertxbus.js")
}
@@ -0,0 +1,51 @@
+package org.vertx.java.core.sockjs;
+
+import org.vertx.java.core.json.JsonObject;
+
+/**
+ * A listener for events on the EventBusBridge. Register an instance of this class on the SockJSServer to intercept
+ * events on the EventBusBridge.
+ */
+public interface EventBusBridgeListener {
+
+ /**
+ * This method gets fired when a message gets sent to an address.
+ * @param writeHandlerId The socket ID of the client who sent the message.
+ * @param address The address this message was sent to.
+ * @param message The sent message.
+ * @return True, if the message should still be sent to the address, false otherwise.
+ */
+ boolean sendingMessage(String writeHandlerId, final String address, JsonObject message);
+
+ /**
+ * This method gets fired when a message gets published to an address.
+ * @param writeHandlerId The socket ID of the client who sent the message.
+ * @param address The address this message was sent to.
+ * @param message The sent message.
+ * @return True, if the message should still be published to the address, false otherwise.
+ */
+ boolean publishingMessage(String writeHandlerId, final String address, JsonObject message);
+
+ /**
+ * This method gets fired when a client registered to an address. It should return whether the handler should still
+ * be registered.
+ * @param writeHandlerId The socket ID of the client who registered on the address.
+ * @param address The address the newly registered handler listens on.
+ * @return True, if the handler should still be registered, false otherwise.
+ */
+ boolean registeringHandler(String writeHandlerId, final String address);
+
+ /**
+ * This method gets fired when a client unregistered a handler for an address.
+ * @param writeHandlerId The socket ID of the client who registered on the address.
+ * @param address The address the unregistered handler listened on.
+ */
+ void unregisteredHandler(String writeHandlerId, final String address);
+
+ /**
+ * This method gets fired when a client disconnects.
+ * @param writeHandlerId The socket ID of the client who disconnected.
+ */
+ void clientDisconnected(String writeHandlerId);
+
+}
@@ -93,5 +93,12 @@ void bridge(JsonObject sjsConfig, JsonArray inboundPermitted, JsonArray outbound
void bridge(JsonObject sjsConfig, JsonArray inboundPermitted, JsonArray outboundPermitted,
long authTimeout, String authAddress);
+ /**
+ * Sets a listener for sending, publishing, registering, unregistering and disconnect events on the bridge.
+ * @param listener A listener for all the possible events.
+ * @return The SockJSServer instance for chaining.
+ */
+ SockJSServer setEventBusBridgeListener(EventBusBridgeListener listener);
+
}
@@ -31,6 +31,7 @@
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.core.logging.impl.LoggerFactory;
+import org.vertx.java.core.sockjs.EventBusBridgeListener;
import org.vertx.java.core.sockjs.SockJSServer;
import org.vertx.java.core.sockjs.SockJSSocket;
@@ -57,6 +58,8 @@
private WebSocketMatcher wsMatcher = new WebSocketMatcher();
private final Map<String, Session> sessions;
+ private EventBusBridgeListener bridgeHook = null;
+
public DefaultSockJSServer(final VertxInternal vertx, final HttpServer httpServer) {
this.vertx = vertx;
this.sessions = vertx.sharedData().getMap("_vertx.sockjssessions");
@@ -180,17 +183,23 @@ public void handle(HttpServerRequest req) {
}
public void bridge(JsonObject sjsConfig, JsonArray inboundPermitted, JsonArray outboundPermitted) {
- new EventBusBridge(vertx, this, sjsConfig, inboundPermitted, outboundPermitted);
+ installApp(sjsConfig, new EventBusBridge(vertx, inboundPermitted, outboundPermitted, bridgeHook));
}
public void bridge(JsonObject sjsConfig, JsonArray inboundPermitted, JsonArray outboundPermitted,
long authTimeout) {
- new EventBusBridge(vertx, this, sjsConfig, inboundPermitted, outboundPermitted, authTimeout);
+ installApp(sjsConfig, new EventBusBridge(vertx, inboundPermitted, outboundPermitted, bridgeHook, authTimeout));
}
public void bridge(JsonObject sjsConfig, JsonArray inboundPermitted, JsonArray outboundPermitted,
long authTimeout, String authAddress) {
- new EventBusBridge(vertx, this, sjsConfig, inboundPermitted, outboundPermitted, authTimeout, authAddress);
+ installApp(sjsConfig, new EventBusBridge(vertx, inboundPermitted, outboundPermitted, bridgeHook, authTimeout, authAddress));
+ }
+
+ @Override
+ public SockJSServer setEventBusBridgeListener(EventBusBridgeListener bridgeHook) {
+ this.bridgeHook = bridgeHook;
+ return this;
}
private Handler<HttpServerRequest> createChunkingTestHandler() {
@@ -28,7 +28,7 @@
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.core.logging.impl.LoggerFactory;
-import org.vertx.java.core.sockjs.SockJSServer;
+import org.vertx.java.core.sockjs.EventBusBridgeListener;
import org.vertx.java.core.sockjs.SockJSSocket;
import java.util.ArrayList;
@@ -58,6 +58,7 @@
private final Map<SockJSSocket, Set<String>> sockAuths = new HashMap<>();
private final List<JsonObject> inboundPermitted;
private final List<JsonObject> outboundPermitted;
+ private final EventBusBridgeListener bridgeHook;
private final long authTimeout;
private final String authAddress;
private final Vertx vertx;
@@ -76,19 +77,19 @@
return l;
}
- EventBusBridge(Vertx vertx, SockJSServer sjsServer, JsonObject sjsConfig, JsonArray inboundPermitted,
- JsonArray outboundPermitted) {
- this(vertx, sjsServer, sjsConfig, inboundPermitted, outboundPermitted, DEFAULT_AUTH_TIMEOUT, null);
+ EventBusBridge(Vertx vertx, JsonArray inboundPermitted, JsonArray outboundPermitted,
+ EventBusBridgeListener bridgeHook) {
+ this(vertx, inboundPermitted, outboundPermitted, bridgeHook, DEFAULT_AUTH_TIMEOUT, null);
}
- EventBusBridge(Vertx vertx, SockJSServer sjsServer, JsonObject sjsConfig, JsonArray inboundPermitted,
- JsonArray outboundPermitted,
+ EventBusBridge(Vertx vertx, JsonArray inboundPermitted, JsonArray outboundPermitted,
+ EventBusBridgeListener bridgeHook,
long authTimeout) {
- this(vertx, sjsServer, sjsConfig, inboundPermitted, outboundPermitted, authTimeout, null);
+ this(vertx, inboundPermitted, outboundPermitted, bridgeHook, authTimeout, null);
}
- EventBusBridge(Vertx vertx, SockJSServer sjsServer, JsonObject sjsConfig, JsonArray inboundPermitted,
- JsonArray outboundPermitted,
+ EventBusBridge(Vertx vertx, JsonArray inboundPermitted, JsonArray outboundPermitted,
+ EventBusBridgeListener bridgeHook,
long authTimeout,
String authAddress) {
this.vertx = vertx;
@@ -103,7 +104,7 @@
authAddress = DEFAULT_AUTH_ADDRESS;
}
this.authAddress = authAddress;
- sjsServer.installApp(sjsConfig, this);
+ this.bridgeHook = bridgeHook;
}
public void handle(final SockJSSocket sock) {
@@ -113,6 +114,10 @@ public void handle(final SockJSSocket sock) {
sock.endHandler(new SimpleHandler() {
public void handle() {
+ if (bridgeHook != null) {
+ bridgeHook.clientDisconnected(sock.writeHandlerID);
+ }
+
// On close unregister any handlers that haven't been unregistered
for (Map.Entry<String, Handler<Message<JsonObject>>> entry: handlers.entrySet()) {
eb.unregisterHandler(entry.getKey(), entry.getValue());
@@ -161,16 +166,37 @@ public void handle(Buffer data) {
String address = getMandatoryString(msg, "address");
switch (type) {
case "send":
- sendOrPub(true, msg, address);
+ if (bridgeHook != null) {
+ if (bridgeHook.sendingMessage(sock.writeHandlerID, address, msg)) {
+ sendOrPub(true, msg, address);
+ }
+ } else {
+ sendOrPub(true, msg, address);
+ }
break;
case "publish":
- sendOrPub(false, msg, address);
+ if (bridgeHook != null) {
+ if (bridgeHook.publishingMessage(sock.writeHandlerID, address, msg)) {
+ sendOrPub(false, msg, address);
+ }
+ } else {
+ sendOrPub(false, msg, address);
+ }
break;
case "register":
- handleRegister(address);
+ if (bridgeHook != null) {
+ if (bridgeHook.registeringHandler(sock.writeHandlerID, address)) {
+ handleRegister(address);
+ }
+ } else {
+ handleRegister(address);
+ }
break;
case "unregister":
handleUnregister(address);
+ if (bridgeHook != null) {
+ bridgeHook.unregisteredHandler(sock.writeHandlerID, address);
+ }
break;
default:
throw new IllegalStateException("Invalid type: " + type);
@@ -255,6 +255,24 @@ To run it, open one or more browsers and point them to http://localhost:8080.
First connect, then try subscribing and sending messages and see how the separate browsers can interoperate on the event bus.
+## Eventbus Bridge Listener
+
+This example shows how the vert.x event bus listener can intercept messages from clients.
+
+To run the server:
+
+vertx run eventbusbridgelistener/bridge_server.js
+
+The example shows how the server can listen to events of client actions.
+
+You can always see that the server intercepts these messages by clicking "Refresh Log".
+
+To run it, open one or more browsers and point them to http://localhost:8080.
+
+First connect, then try sending/publishing messages or registering/unregistering handlers. You can see how the server intercepts these messages by refreshing the log. You are able to see with different browsers, that send will always do round robin. Publishing will be censored (never shown in the messages, only in the log).
+
+Registering a handler at "secret" will not be allowed, so you can see that even though the client can try to register a handler, it won't be able to see message sent to the address.
+
## Web application
This is a full end-end "real-time" web appplication which has a modern JavaScript client side MVVM application that communicates via the event bus with a persistor.
@@ -0,0 +1,45 @@
+load('vertx.js')
+
+var server = vertx.createHttpServer()
+var sockjsServer;
+
+var log = [];
+
+// Serve the static resources
+server.requestHandler(function(req) {
+ if (req.uri == "/") req.response.sendFile("eventbusbridgelistener/index.html")
+ if (req.uri == "/log.json") req.response.end(JSON.stringify(log));
+ if (req.uri == "/vertxbus.js") req.response.sendFile("eventbusbridgelistener/vertxbus.js")
+});
+
+
+var myListener = {
+ sendingMessage: function(clientId, address, message) {
+ log.push(clientId + ' sent ' + message.encode() + ' to ' + address);
+ return true; // let the client send the message
+ },
+ publishingMessage: function(clientId, address, message) {
+ log.push(clientId + ' tried to publish ' + message.encode() + ' to ' + address);
+ return false; // don't let any publish methods through
+ },
+ registeringHandler: function(clientId, address) {
+ if (address === 'secret') {
+ log.push('did not allow ' + clientId + ' registering at ' + address);
+ return false;
+ } else {
+ log.push(clientId + ' registered at ' + address);
+ return true;
+ }
+ },
+ unregisteredHandler: function(clientId, address) {
+ log.push(clientId + ' unregistered handler at ' + address);
+ },
+ clientDisconnected: function(clientId) {
+ log.push(clientId + ' disconnected');
+ }
+};
+
+// Create a SockJS bridge which lets everything through (be careful!)
+vertx.createSockJSServer(server).setEventBusBridgeListener(myListener).bridge({prefix: "/eventbus"}, [{}], [{}]);
+
+server.listen(8080);
Oops, something went wrong.

0 comments on commit c0976fc

Please sign in to comment.