Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Ability to create a listener for events on the EventBusBridge

  • Loading branch information...
commit d7591ee5d50884bc1a6989a66977182354299502 1 parent dfe5fce
@Narigo Narigo authored
View
51 vertx-core/src/main/java/org/vertx/java/core/sockjs/EventBusBridgeListener.java
@@ -0,0 +1,51 @@
+package org.vertx.java.core.sockjs;
+
+import org.vertx.java.core.json.JsonObject;
+
+/**
+ * A listener for events on the EventBusBridge. Register an instance of this class on the SockJSServer to intercept
+ * events on the EventBusBridge.
+ */
+public interface EventBusBridgeListener {
+
+ /**
+ * This method gets fired when a message gets sent to an address.
+ * @param writeHandlerId The socket ID of the client who sent the message.
+ * @param address The address this message was sent to.
+ * @param message The sent message.
+ * @return True, if the message should still be sent to the address, false otherwise.
+ */
+ boolean sendingMessage(String writeHandlerId, final String address, JsonObject message);
+
+ /**
+ * This method gets fired when a message gets published to an address.
+ * @param writeHandlerId The socket ID of the client who sent the message.
+ * @param address The address this message was sent to.
+ * @param message The sent message.
+ * @return True, if the message should still be published to the address, false otherwise.
+ */
+ boolean publishingMessage(String writeHandlerId, final String address, JsonObject message);
+
+ /**
+ * This method gets fired when a client registered to an address. It should return whether the handler should still
+ * be registered.
+ * @param writeHandlerId The socket ID of the client who registered on the address.
+ * @param address The address the newly registered handler listens on.
+ * @return True, if the handler should still be registered, false otherwise.
+ */
+ boolean registeringHandler(String writeHandlerId, final String address);
+
+ /**
+ * This method gets fired when a client unregistered a handler for an address.
+ * @param writeHandlerId The socket ID of the client who registered on the address.
+ * @param address The address the unregistered handler listened on.
+ */
+ void unregisteredHandler(String writeHandlerId, final String address);
+
+ /**
+ * This method gets fired when a client disconnects.
+ * @param writeHandlerId The socket ID of the client who disconnected.
+ */
+ void clientDisconnected(String writeHandlerId);
+
+}
View
7 vertx-core/src/main/java/org/vertx/java/core/sockjs/SockJSServer.java
@@ -93,5 +93,12 @@ void bridge(JsonObject sjsConfig, JsonArray inboundPermitted, JsonArray outbound
void bridge(JsonObject sjsConfig, JsonArray inboundPermitted, JsonArray outboundPermitted,
long authTimeout, String authAddress);
+ /**
+ * Sets a listener for sending, publishing, registering, unregistering and disconnect events on the bridge.
+ * @param listener A listener for all the possible events.
+ * @return The SockJSServer instance for chaining.
+ */
+ SockJSServer setEventBusBridgeListener(EventBusBridgeListener listener);
+
}
View
15 vertx-core/src/main/java/org/vertx/java/core/sockjs/impl/DefaultSockJSServer.java
@@ -31,6 +31,7 @@
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.core.logging.impl.LoggerFactory;
+import org.vertx.java.core.sockjs.EventBusBridgeListener;
import org.vertx.java.core.sockjs.SockJSServer;
import org.vertx.java.core.sockjs.SockJSSocket;
@@ -57,6 +58,8 @@
private WebSocketMatcher wsMatcher = new WebSocketMatcher();
private final Map<String, Session> sessions;
+ private EventBusBridgeListener bridgeHook = null;
+
public DefaultSockJSServer(final VertxInternal vertx, final HttpServer httpServer) {
this.vertx = vertx;
this.sessions = vertx.sharedData().getMap("_vertx.sockjssessions");
@@ -180,17 +183,23 @@ public void handle(HttpServerRequest req) {
}
public void bridge(JsonObject sjsConfig, JsonArray inboundPermitted, JsonArray outboundPermitted) {
- new EventBusBridge(vertx, this, sjsConfig, inboundPermitted, outboundPermitted);
+ installApp(sjsConfig, new EventBusBridge(vertx, inboundPermitted, outboundPermitted, bridgeHook));
}
public void bridge(JsonObject sjsConfig, JsonArray inboundPermitted, JsonArray outboundPermitted,
long authTimeout) {
- new EventBusBridge(vertx, this, sjsConfig, inboundPermitted, outboundPermitted, authTimeout);
+ installApp(sjsConfig, new EventBusBridge(vertx, inboundPermitted, outboundPermitted, bridgeHook, authTimeout));
}
public void bridge(JsonObject sjsConfig, JsonArray inboundPermitted, JsonArray outboundPermitted,
long authTimeout, String authAddress) {
- new EventBusBridge(vertx, this, sjsConfig, inboundPermitted, outboundPermitted, authTimeout, authAddress);
+ installApp(sjsConfig, new EventBusBridge(vertx, inboundPermitted, outboundPermitted, bridgeHook, authTimeout, authAddress));
+ }
+
+ @Override
+ public SockJSServer setEventBusBridgeListener(EventBusBridgeListener bridgeHook) {
+ this.bridgeHook = bridgeHook;
+ return this;
}
private Handler<HttpServerRequest> createChunkingTestHandler() {
View
52 vertx-core/src/main/java/org/vertx/java/core/sockjs/impl/EventBusBridge.java
@@ -28,7 +28,7 @@
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.core.logging.impl.LoggerFactory;
-import org.vertx.java.core.sockjs.SockJSServer;
+import org.vertx.java.core.sockjs.EventBusBridgeListener;
import org.vertx.java.core.sockjs.SockJSSocket;
import java.util.ArrayList;
@@ -58,6 +58,7 @@
private final Map<SockJSSocket, Set<String>> sockAuths = new HashMap<>();
private final List<JsonObject> inboundPermitted;
private final List<JsonObject> outboundPermitted;
+ private final EventBusBridgeListener bridgeHook;
private final long authTimeout;
private final String authAddress;
private final Vertx vertx;
@@ -76,19 +77,19 @@
return l;
}
- EventBusBridge(Vertx vertx, SockJSServer sjsServer, JsonObject sjsConfig, JsonArray inboundPermitted,
- JsonArray outboundPermitted) {
- this(vertx, sjsServer, sjsConfig, inboundPermitted, outboundPermitted, DEFAULT_AUTH_TIMEOUT, null);
+ EventBusBridge(Vertx vertx, JsonArray inboundPermitted, JsonArray outboundPermitted,
+ EventBusBridgeListener bridgeHook) {
+ this(vertx, inboundPermitted, outboundPermitted, bridgeHook, DEFAULT_AUTH_TIMEOUT, null);
}
- EventBusBridge(Vertx vertx, SockJSServer sjsServer, JsonObject sjsConfig, JsonArray inboundPermitted,
- JsonArray outboundPermitted,
+ EventBusBridge(Vertx vertx, JsonArray inboundPermitted, JsonArray outboundPermitted,
+ EventBusBridgeListener bridgeHook,
long authTimeout) {
- this(vertx, sjsServer, sjsConfig, inboundPermitted, outboundPermitted, authTimeout, null);
+ this(vertx, inboundPermitted, outboundPermitted, bridgeHook, authTimeout, null);
}
- EventBusBridge(Vertx vertx, SockJSServer sjsServer, JsonObject sjsConfig, JsonArray inboundPermitted,
- JsonArray outboundPermitted,
+ EventBusBridge(Vertx vertx, JsonArray inboundPermitted, JsonArray outboundPermitted,
+ EventBusBridgeListener bridgeHook,
long authTimeout,
String authAddress) {
this.vertx = vertx;
@@ -103,7 +104,7 @@
authAddress = DEFAULT_AUTH_ADDRESS;
}
this.authAddress = authAddress;
- sjsServer.installApp(sjsConfig, this);
+ this.bridgeHook = bridgeHook;
}
public void handle(final SockJSSocket sock) {
@@ -113,6 +114,10 @@ public void handle(final SockJSSocket sock) {
sock.endHandler(new SimpleHandler() {
public void handle() {
+ if (bridgeHook != null) {
+ bridgeHook.clientDisconnected(sock.writeHandlerID);
+ }
+
// On close unregister any handlers that haven't been unregistered
for (Map.Entry<String, Handler<Message<JsonObject>>> entry: handlers.entrySet()) {
eb.unregisterHandler(entry.getKey(), entry.getValue());
@@ -161,16 +166,37 @@ public void handle(Buffer data) {
String address = getMandatoryString(msg, "address");
switch (type) {
case "send":
- sendOrPub(true, msg, address);
+ if (bridgeHook != null) {
+ if (bridgeHook.sendingMessage(sock.writeHandlerID, address, msg)) {
+ sendOrPub(true, msg, address);
+ }
+ } else {
+ sendOrPub(true, msg, address);
+ }
break;
case "publish":
- sendOrPub(false, msg, address);
+ if (bridgeHook != null) {
+ if (bridgeHook.publishingMessage(sock.writeHandlerID, address, msg)) {
+ sendOrPub(false, msg, address);
+ }
+ } else {
+ sendOrPub(false, msg, address);
+ }
break;
case "register":
- handleRegister(address);
+ if (bridgeHook != null) {
+ if (bridgeHook.registeringHandler(sock.writeHandlerID, address)) {
+ handleRegister(address);
+ }
+ } else {
+ handleRegister(address);
+ }
break;
case "unregister":
handleUnregister(address);
+ if (bridgeHook != null) {
+ bridgeHook.unregisteredHandler(sock.writeHandlerID, address);
+ }
break;
default:
throw new IllegalStateException("Invalid type: " + type);
Please sign in to comment.
Something went wrong with that request. Please try again.