Skip to content

Commit

Permalink
STOMP server process client frames that would not send initially a co…
Browse files Browse the repository at this point in the history
…nnect frame

A Vert.x STOMP server processes client STOMP frames without checking that the client send an initial CONNECT frame replied with a successful CONNECTED frame. The client can subscribe to a destination or publish message without prior authentication. Any Vert.x STOMP server configured with an authentication handler is impacted.

Fixes CVE-2023-32081
  • Loading branch information
vietj committed May 12, 2023
1 parent 5cc8222 commit 0de4bc5
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
public class DefaultConnectHandler implements Handler<ServerFrame> {

@Override
public void handle(ServerFrame sf) {
// Server negotiation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,5 @@ public interface StompServerConnection {
* @param pingHandler the ping handler
*/
void configureHeartbeat(long ping, long pong, Handler<StompServerConnection> pingHandler);

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class DefaultStompHandler implements StompServerHandler {
private final Vertx vertx;
private final Context context;

private Handler<ServerFrame> connectHandler = new DefaultConnectHandler();
private Handler<ServerFrame> connectHandler;

private Handler<ServerFrame> stompHandler;

Expand Down Expand Up @@ -125,6 +125,7 @@ public DefaultStompHandler(Vertx vertx) {
this.context = Vertx.currentContext();
this.destinations = vertx.sharedData().getLocalMap("stomp.destinations");
this.users = new ConcurrentHashMap<>();
this.connectHandler = new DefaultConnectHandler();
}

@Override
Expand Down
58 changes: 54 additions & 4 deletions src/main/java/io/vertx/ext/stomp/impl/StompServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import io.vertx.core.net.NetServer;
import io.vertx.ext.stomp.*;

import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Default implementation of the {@link StompServer}.
Expand Down Expand Up @@ -109,7 +111,17 @@ public StompServer listen(int port, String host, Handler<AsyncResult<StompServer
"server.");
server
.connectHandler(socket -> {
StompServerConnection connection = new StompServerTCPConnectionImpl(socket, this, writingFrameHandler);
AtomicBoolean connected = new AtomicBoolean();
AtomicBoolean firstFrame = new AtomicBoolean();
StompServerConnection connection = new StompServerTCPConnectionImpl(socket, this, frame -> {
if (frame.frame().getCommand() == Command.CONNECTED) {
connected.set(true);
}
Handler<ServerFrame> h = writingFrameHandler;
if (h != null) {
h.handle(frame);
}
});
FrameParser parser = new FrameParser(options);
socket.exceptionHandler((exception) -> {
LOGGER.error("The STOMP server caught a TCP socket error - closing connection", exception);
Expand All @@ -123,7 +135,21 @@ public StompServer listen(int port, String host, Handler<AsyncResult<StompServer
connection.close();
}
)
.handler(frame -> stomp.handle(new ServerFrameImpl(frame, connection)));
.handler(frame -> {
if (frame.getCommand() == Command.CONNECT || frame.getCommand() == Command.STOMP) {
if (firstFrame.compareAndSet(false, true)) {
stomp.handle(new ServerFrameImpl(frame, connection));
} else {
connection.write(Frames.createErrorFrame("Already connected", Collections.emptyMap(), ""));
connection.close();
}
} else if (connected.get()) {
stomp.handle(new ServerFrameImpl(frame, connection));
} else {
connection.write(Frames.createErrorFrame("Not connected", Collections.emptyMap(), ""));
connection.close();
}
});
socket.handler(parser);
})
.listen(port, host).onComplete(ar -> {
Expand Down Expand Up @@ -218,7 +244,17 @@ public Handler<ServerWebSocket> webSocketHandler() {
socket.reject();
return;
}
StompServerConnection connection = new StompServerWebSocketConnectionImpl(socket, this, writingFrameHandler);
AtomicBoolean connected = new AtomicBoolean();
AtomicBoolean firstFrame = new AtomicBoolean();
StompServerConnection connection = new StompServerWebSocketConnectionImpl(socket, this, frame -> {
if (frame.frame().getCommand() == Command.CONNECTED || frame.frame().getCommand() == Command.STOMP) {
connected.set(true);
}
Handler<ServerFrame> h = writingFrameHandler;
if (h != null) {
h.handle(frame);
}
});
FrameParser parser = new FrameParser(options);
socket.exceptionHandler((exception) -> {
LOGGER.error("The STOMP server caught a WebSocket error - closing connection", exception);
Expand All @@ -232,7 +268,21 @@ public Handler<ServerWebSocket> webSocketHandler() {
connection.close();
}
)
.handler(frame -> stomp.handle(new ServerFrameImpl(frame, connection)));
.handler(frame -> {
if (frame.getCommand() == Command.CONNECT) {
if (firstFrame.compareAndSet(false, true)) {
stomp.handle(new ServerFrameImpl(frame, connection));
} else {
connection.write(Frames.createErrorFrame("Already connected", Collections.emptyMap(), ""));
connection.close();
}
} else if (connected.get()) {
stomp.handle(new ServerFrameImpl(frame, connection));
} else {
connection.write(Frames.createErrorFrame("Not connected", Collections.emptyMap(), ""));
connection.close();
}
});
socket.handler(parser);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@

package io.vertx.ext.stomp.impl;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.ext.auth.User;
import io.vertx.ext.auth.authentication.AuthenticationProvider;
Expand All @@ -38,15 +44,21 @@
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.Arrays;

/**
* Tests STOMP server with security.
*
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
@RunWith(VertxUnitRunner.class)
public class SecuredServerConnectionTest {

private Vertx vertx;
private StompServer server;
private HttpServer wsServer;
private HttpClient wsClient;
private StompClient client;

@Rule
public RunTestOnContext rule = new RunTestOnContext();
Expand All @@ -55,9 +67,17 @@ public class SecuredServerConnectionTest {
public void setUp(TestContext context) {
vertx = rule.vertx();
AuthenticationProvider provider = PropertyFileAuthentication.create(vertx, "test-auth.properties");
server = StompServer.create(vertx, new StompServerOptions().setSecured(true))
.handler(StompServerHandler.create(vertx).authProvider(provider));
server.listen().onComplete(context.asyncAssertSuccess());
server = StompServer.create(vertx, new StompServerOptions()
.setSecured(true)
.setWebsocketBridge(true)
.setWebsocketPath("/stomp"))
.handler(StompServerHandler.create(vertx).authProvider(provider));
server.listen(StompServerOptions.DEFAULT_STOMP_PORT).onComplete(context.asyncAssertSuccess());
wsServer = vertx.createHttpServer(new HttpServerOptions().setWebSocketSubProtocols(Arrays.asList("v10.stomp", "v11.stomp")))
.webSocketHandler(server.webSocketHandler());
wsServer.listen(8080).onComplete(context.asyncAssertSuccess());
wsClient = vertx.createHttpClient();
client = StompClient.create(vertx, new StompClientOptions().setLogin("admin").setPasscode("admin"));
}

@After
Expand Down Expand Up @@ -162,11 +182,66 @@ public void testClientConnectRejection(TestContext context) {
}

void validate(TestContext context, Buffer buffer) {
context.assertTrue(buffer.toString().contains("CONNECTED"));
context.assertTrue(buffer.toString().contains("CONNECTED"), "Was expected <" + buffer.toString() + "> to contain 'CONNECTED'");
context.assertTrue(buffer.toString().contains("version:1.2"));

User user = server.stompHandler().getUserBySession(extractSession(buffer.toString()));
context.assertNotNull(user);
}

@Test
public void testTCPClientMustBeConnected(TestContext context) {
Async async = context.async();
NetClient client = vertx.createNetClient();
testClientMustBeConnected(context, v -> {
client.connect(server.actualPort(), "0.0.0.0").onComplete(context.asyncAssertSuccess(so -> {
Buffer received = Buffer.buffer();
so.handler(received::appendBuffer);
so.write(
"SEND\n" +
"destination:/test\n" +
"\n" +
"hello" +
FrameParser.NULL);
so.endHandler(v2 -> {
context.assertTrue(received.toString().startsWith("ERROR\n"));
async.complete();
});
}));
});
}

@Test
public void testWebSocketClientMustBeConnected(TestContext context) {
Async async = context.async();
testClientMustBeConnected(context, v -> {
wsClient.webSocket(8080, "localhost", "/stomp").onComplete(context.asyncAssertSuccess(ws -> {
Buffer received = Buffer.buffer();
ws.binaryMessageHandler(received::appendBuffer);
ws.writeBinaryMessage(
Buffer.buffer("SEND\n" +
"destination:/test\n" +
"\n" +
"hello" +
FrameParser.NULL));
ws.endHandler(v2 -> {
context.assertTrue(received.toString().startsWith("ERROR\n"));
async.complete();
});
}));
});
}

private void testClientMustBeConnected(TestContext context, Handler<Void> cont) {
client
.connect(server.actualPort(), "localhost")
.onComplete(context.asyncAssertSuccess(conn -> {
Future<String> fut = conn.subscribe("/test", frame -> {
context.fail("Should not receive a messsage");
});
fut.onComplete(context.asyncAssertSuccess(v2 -> {
cont.handle(null);
}));
}));
}
}

0 comments on commit 0de4bc5

Please sign in to comment.