Skip to content

Commit

Permalink
Split messages only if configured to do so
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev committed Mar 12, 2024
1 parent 76d00d7 commit 73ee86c
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -75,16 +76,13 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif

private static final Log logger = LogFactory.getLog(WebSocketStompClient.class);

/**
* The default max size for in&outbound STOMP message.
*/
private static final int DEFAULT_MESSAGE_MAX_SIZE = 64 * 1024;

private final WebSocketClient webSocketClient;

private int inboundMessageSizeLimit = DEFAULT_MESSAGE_MAX_SIZE;
private int inboundMessageSizeLimit = 64 * 1024;

private int outboundMessageSizeLimit = DEFAULT_MESSAGE_MAX_SIZE;
@Nullable
private Integer outboundMessageSizeLimit;

private boolean autoStartup = true;

Expand Down Expand Up @@ -131,7 +129,7 @@ public void setTaskScheduler(@Nullable TaskScheduler taskScheduler) {
* Since a STOMP message can be received in multiple WebSocket messages,
* buffering may be required and this property determines the maximum buffer
* size per message.
* <p>By default this is set to 64 * 1024 (64K), see {@link WebSocketStompClient#DEFAULT_MESSAGE_MAX_SIZE}.
* <p>By default this is set to 64 * 1024 (64K).
*/
public void setInboundMessageSizeLimit(int inboundMessageSizeLimit) {
this.inboundMessageSizeLimit = inboundMessageSizeLimit;
Expand All @@ -148,18 +146,19 @@ public int getInboundMessageSizeLimit() {
* Configure the maximum size allowed for outbound STOMP message.
* If STOMP message's size exceeds {@link WebSocketStompClient#outboundMessageSizeLimit},
* STOMP message is split into multiple frames.
* <p>By default this is set to 64 * 1024 (64K), see {@link WebSocketStompClient#DEFAULT_MESSAGE_MAX_SIZE}.
* <p>By default this is not set in which case each STOMP message are not split.
* @since 6.2
*/
public void setOutboundMessageSizeLimit(int outboundMessageSizeLimit) {
public void setOutboundMessageSizeLimit(Integer outboundMessageSizeLimit) {
this.outboundMessageSizeLimit = outboundMessageSizeLimit;
}

/**
* Get the configured outbound message buffer size in bytes.
* @since 6.2
*/
public int getOutboundMessageSizeLimit() {
@Nullable
public Integer getOutboundMessageSizeLimit() {
return this.outboundMessageSizeLimit;
}

Expand Down Expand Up @@ -479,8 +478,13 @@ public CompletableFuture<Void> sendAsync(Message<byte[]> message) {
try {
WebSocketSession session = this.session;
Assert.state(session != null, "No WebSocketSession available");
for (WebSocketMessage<?> webSocketMessage : this.codec.encode(message, session.getClass())) {
session.sendMessage(webSocketMessage);
if (this.codec.hasSplittingEncoder()) {
for (WebSocketMessage<?> outMessage : this.codec.encodeAndSplit(message, session.getClass())) {
session.sendMessage(outMessage);
}
}
else {
session.sendMessage(this.codec.encode(message, session.getClass()));
}
future.complete(null);
}
Expand Down Expand Up @@ -592,11 +596,13 @@ private static class StompWebSocketMessageCodec {

private final BufferingStompDecoder bufferingDecoder;

@Nullable
private final SplittingStompEncoder splittingEncoder;

public StompWebSocketMessageCodec(int inboundMessageSizeLimit, int outboundMessageSizeLimit) {
public StompWebSocketMessageCodec(int inboundMessageSizeLimit, @Nullable Integer outboundMessageSizeLimit) {
this.bufferingDecoder = new BufferingStompDecoder(DECODER, inboundMessageSizeLimit);
this.splittingEncoder = new SplittingStompEncoder(ENCODER, outboundMessageSizeLimit);
this.splittingEncoder = (outboundMessageSizeLimit != null ?
new SplittingStompEncoder(ENCODER, outboundMessageSizeLimit) : null);
}

public List<Message<byte[]>> decode(WebSocketMessage<?> webSocketMessage) {
Expand All @@ -622,21 +628,41 @@ else if (webSocketMessage instanceof BinaryMessage binaryMessage) {
return result;
}

public List<WebSocketMessage<?>> encode(Message<byte[]> message, Class<? extends WebSocketSession> sessionType) {
public boolean hasSplittingEncoder() {
return (this.splittingEncoder != null);
}

public WebSocketMessage<?> encode(Message<byte[]> message, Class<? extends WebSocketSession> sessionType) {
StompHeaderAccessor accessor = getStompHeaderAccessor(message);
byte[] payload = message.getPayload();
byte[] frame = ENCODER.encode(accessor.getMessageHeaders(), payload);
return (useBinary(accessor, payload, sessionType) ? new BinaryMessage(frame) : new TextMessage(frame));
}

public List<WebSocketMessage<?>> encodeAndSplit(Message<byte[]> message, Class<? extends WebSocketSession> sessionType) {
Assert.state(this.splittingEncoder != null, "No SplittingEncoder");
StompHeaderAccessor accessor = getStompHeaderAccessor(message);
byte[] payload = message.getPayload();
List<byte[]> frames = this.splittingEncoder.encode(accessor.getMessageHeaders(), payload);
boolean useBinary = useBinary(accessor, payload, sessionType);

List<WebSocketMessage<?>> messages = new ArrayList<>(frames.size());
frames.forEach(frame -> messages.add(useBinary ? new BinaryMessage(frame) : new TextMessage(frame)));
return messages;
}

private static StompHeaderAccessor getStompHeaderAccessor(Message<byte[]> message) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
Assert.notNull(accessor, "No StompHeaderAccessor available");
byte[] payload = message.getPayload();
List<byte[]> frames = splittingEncoder.encode(accessor.getMessageHeaders(), payload);
return accessor;
}

private static boolean useBinary(
StompHeaderAccessor accessor, byte[] payload, Class<? extends WebSocketSession> sessionType) {

boolean useBinary = (payload.length > 0 &&
return (payload.length > 0 &&
!(SockJsSession.class.isAssignableFrom(sessionType)) &&
MimeTypeUtils.APPLICATION_OCTET_STREAM.isCompatibleWith(accessor.getContentType()));

List<WebSocketMessage<?>> messages = new ArrayList<>();
for (byte[] frame : frames) {
messages.add(useBinary ? new BinaryMessage(frame) : new TextMessage(frame));
}
return messages;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class WebSocketStompClientIntegrationTests {

private AnnotationConfigWebApplicationContext wac;

private String url;


@BeforeEach
void setUp(TestInfo testInfo) throws Exception {
Expand All @@ -83,6 +85,8 @@ void setUp(TestInfo testInfo) throws Exception {
WebSocketClient webSocketClient = new StandardWebSocketClient();
this.stompClient = new WebSocketStompClient(webSocketClient);
this.stompClient.setMessageConverter(new StringMessageConverter());

this.url = "ws://127.0.0.1:" + this.server.getPort() + "/stomp";
}

@AfterEach
Expand All @@ -109,17 +113,30 @@ void tearDown() {


@Test
@SuppressWarnings("deprecation")
void publishSubscribe() throws Exception {
String url = "ws://127.0.0.1:" + this.server.getPort() + "/stomp";

TestHandler testHandler = new TestHandler("/topic/foo", "payload");
this.stompClient.connect(url, testHandler);
this.stompClient.connectAsync(this.url, testHandler);

assertThat(testHandler.awaitForMessageCount(1, 5000)).isTrue();
assertThat(testHandler.getReceived()).containsExactly("payload");
}

@Test
void publishSubscribeWithSlitMessage() throws Exception {
StringBuilder sb = new StringBuilder();
while (sb.length() < 1024) {
sb.append("A message with a long body... ");
}
String payload = sb.toString();

TestHandler testHandler = new TestHandler("/topic/foo", payload);
this.stompClient.setOutboundMessageSizeLimit(512);
this.stompClient.connectAsync(this.url, testHandler);

assertThat(testHandler.awaitForMessageCount(1, 5000)).isTrue();
assertThat(testHandler.getReceived()).containsExactly(payload);
}


@Configuration(proxyBeanMethods = false)
static class TestConfig extends WebSocketMessageBrokerConfigurationSupport {
Expand Down

0 comments on commit 73ee86c

Please sign in to comment.