Skip to content

Commit

Permalink
Support splitting STOMP messages in WebSocketStompClient
Browse files Browse the repository at this point in the history
  • Loading branch information
injae-kim authored and rstoyanchev committed Mar 12, 2024
1 parent bf014ef commit 76d00d7
Show file tree
Hide file tree
Showing 6 changed files with 580 additions and 10 deletions.
15 changes: 15 additions & 0 deletions framework-docs/modules/ROOT/pages/web/websocket/stomp/client.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,20 @@ it handle ERROR frames in addition to the `handleException` callback for
exceptions from the handling of messages and `handleTransportError` for
transport-level errors including `ConnectionLostException`.

You can also use `setInboundMessageSizeLimit(limit)` and `setOutboundMessageSizeLimit(limit)`
to limit the maximum size of inbound and outbound message size.
When outbound message size exceeds `outboundMessageSizeLimit`, message is split into multiple incomplete frames.
Then receiver buffers these incomplete frames and reassemble to complete message.
When inbound message size exceeds `inboundMessageSizeLimit`, throw `StompConversionException`.
The default value of in&outboundMessageSizeLimit is `64KB`.

[source,java,indent=0,subs="verbatim,quotes"]
----
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setInboundMessageSizeLimit(64 * 1024); // 64KB
stompClient.setOutboundMessageSizeLimit(64 * 1024); // 64KB
----



Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2024-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.messaging.simp.stomp;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.springframework.util.Assert;

/**
* An extension of {@link org.springframework.messaging.simp.stomp.StompEncoder}
* that splits the STOMP message to multiple incomplete STOMP frames
* when the encoded bytes length exceeds {@link SplittingStompEncoder#bufferSizeLimit}.
*
* @author Injae Kim
* @since 6.2
* @see StompEncoder
*/
public class SplittingStompEncoder {

private final StompEncoder encoder;

private final int bufferSizeLimit;

public SplittingStompEncoder(StompEncoder encoder, int bufferSizeLimit) {
Assert.notNull(encoder, "StompEncoder is required");
Assert.isTrue(bufferSizeLimit > 0, "Buffer size limit must be greater than 0");
this.encoder = encoder;
this.bufferSizeLimit = bufferSizeLimit;
}

/**
* Encodes the given payload and headers into a list of one or more {@code byte[]}s.
* @param headers the headers
* @param payload the payload
* @return the list of one or more encoded messages
*/
public List<byte[]> encode(Map<String, Object> headers, byte[] payload) {
byte[] result = this.encoder.encode(headers, payload);
int length = result.length;

if (length <= this.bufferSizeLimit) {
return List.of(result);
}

List<byte[]> frames = new ArrayList<>();
for (int i = 0; i < length; i += this.bufferSizeLimit) {
frames.add(Arrays.copyOfRange(result, i, Math.min(i + this.bufferSizeLimit, length)));
}
return frames;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public MessageHeaderInitializer getHeaderInitializer() {
* Decodes one or more STOMP frames from the given {@code ByteBuffer} into a
* list of {@link Message Messages}. If the input buffer contains partial STOMP frame
* content, or additional content with a partial STOMP frame, the buffer is
* reset and {@code null} is returned.
* reset and an empty list is returned.
* @param byteBuffer the buffer to decode the STOMP frame from
* @return the decoded messages, or an empty list if none
* @throws StompConversionException raised in case of decoding issues
Expand Down
Loading

0 comments on commit 76d00d7

Please sign in to comment.