-
Notifications
You must be signed in to change notification settings - Fork 527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Race condition when using an event bus bridge hook (events handler), between socket closed event and other events #1380
Comments
Still happening on release v4.1. A quick reproducer based on https://github.com/vert-x3/vertx-examples/tree/4.x/web-examples/src/main/java/io/vertx/example/web/realtime: package io.vertx.example.web.realtime;
import io.vertx.core.AbstractVerticle;
import io.vertx.example.util.Runner;
import io.vertx.ext.bridge.BridgeEventType;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.StaticHandler;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;
/*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class Server extends AbstractVerticle {
private static int _counter = 1;
// Convenience method so you can run it in your IDE
public static void main(String[] args) {
Runner.runExample(Server.class);
}
@Override
public void start() throws Exception {
Router router = Router.router(vertx);
// Allow outbound traffic to the news-feed address
SockJSBridgeOptions options = new SockJSBridgeOptions()
.addOutboundPermitted(new PermittedOptions().setAddress("news-feed"));
SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
Router subRouter = sockJSHandler.bridge(options, event -> {
// You can also optionally provide a handler like this which will be passed any events that occur on the bridge
// You can use this for monitoring or logging, or to change the raw messages in-flight.
// It can also be used for fine grained access control.
if (event.type() == BridgeEventType.SOCKET_CREATED) {
System.out.println("A socket was created");
event.complete(true);
} else if(event.type() == BridgeEventType.SOCKET_CLOSED) {
System.out.println("A socket was closed");
event.complete(true);
} else if(event.type() == BridgeEventType.REGISTER) {
System.out.println("Delaying REGISTER event...");
vertx.executeBlocking(future -> {
try {
Thread.sleep(5000);
future.complete();
} catch (InterruptedException e) {
future.fail(e);
}
}, blocking_res -> {
if(blocking_res.succeeded()) {
System.out.println("Accept REGISTER event");
event.complete(true);
} else {
System.err.println("Couldn't sleep after receiving a REGISTER bridge event" +
(blocking_res.cause() != null ? ": " + blocking_res.cause().getMessage() : ""));
event.complete(false);
}
});
} else {
event.complete(true);
}
});
router.mountSubRouter("/eventbus", subRouter);
// Serve the static resources
router.route().handler(StaticHandler.create());
vertx.createHttpServer().requestHandler(router).listen(8080);
System.out.println("Server is listening on port 8080");
// Publish a message to the address "news-feed" every second
vertx.setPeriodic(1000, t -> vertx.eventBus().send("news-feed", "news #" + _counter++ + " from the server!"));
}
} Steps to reproduce
|
can you investigate this @pmlopes it looks related to a websocket potential leak |
@julien3 I've created PR to address the issue. Like you researched, this is a timing issue, given that the interceptor is run as an asyncrhonous call, it can happen that the call completes the socket state has changed. The map that holds the data is still valid, there are no issues there, so the fix, is about asserting that the socket state (more exactly, the presence of it) is the same at the beginning and at the end of the interceptor call. |
@vietj I believe that the issue is fixed in the referred PR. I don't think there's any leak as it turned out that it was a timing bug. |
Fix #1380: Only run success action if socket map state is still valid
Version
Context
We started to see that some messages were not delivered to the same event bus (bridge) address. After multiple re-connections, a client can't listen anymore to some particular address as it receives nothing or receives too few messages out of all the messages sent. The server needs to be restarted.
When an event bus bridge hook (events handler) is used, it seems like there's a race condition where io.vertx.ext.web.handler.sockjs.impl.EventBusBridgeImpl.java may call handleSocketClosed while we're still in internalHandleRegister (or unregister, sendorpub). This can happen before checkCallHook's okAction is run as we're still waiting for the hook (event handlers) to accept the event...
Do you have a reproducer?
See steps below
Steps to reproduce
We reproduced this with custom tools but I think this scenario is sufficient to reproduce it:
Extra
Should apply to 3.8.1 and master as nothing was changed that could fix this issue in this file (EventBusBridgeImpl.java).
We fixed this internally by just checking for
if (!sockInfos.containsKey(sock)) return;
at the beginning of checkCallHook's okAction handlers.
It could probably explain #1327: the message consumer's handler references the closed socket when the race condition issue happens while a REGISTER event is being handled.
The text was updated successfully, but these errors were encountered: