Skip to content

Commit

Permalink
Expose timeToFirstMessage in Java/XML config
Browse files Browse the repository at this point in the history
Issue: SPR-16531
  • Loading branch information
rstoyanchev committed May 16, 2018
1 parent 18854ee commit 29158aa
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 26 deletions.
Expand Up @@ -308,6 +308,9 @@ private RuntimeBeanReference registerStompHandler(Element element, RuntimeBeanRe
if (transportElem.hasAttribute("send-buffer-size")) {
handlerDef.getPropertyValues().add("sendBufferSizeLimit", transportElem.getAttribute("send-buffer-size"));
}
if (transportElem.hasAttribute("time-to-first-message")) {
handlerDef.getPropertyValues().add("timeToFirstMessage", transportElem.getAttribute("time-to-first-message"));
}
Element factoriesElement = DomUtils.getChildElementByTagName(transportElem, "decorator-factories");
if (factoriesElement != null) {
ManagedList<Object> factories = extractBeanSubElements(factoriesElement, context);
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -77,6 +77,9 @@ public WebMvcStompEndpointRegistry(WebSocketHandler webSocketHandler,
if (transportRegistration.getSendBufferSizeLimit() != null) {
this.subProtocolWebSocketHandler.setSendBufferSizeLimit(transportRegistration.getSendBufferSizeLimit());
}
if (transportRegistration.getTimeToFirstMessage() != null) {
this.subProtocolWebSocketHandler.setTimeToFirstMessage(transportRegistration.getTimeToFirstMessage());
}

this.stompHandler = new StompSubProtocolHandler();
if (transportRegistration.getMessageSizeLimit() != null) {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,6 +40,9 @@ public class WebSocketTransportRegistration {
@Nullable
private Integer sendBufferSizeLimit;

@Nullable
private Integer timeToFirstMessage;

private final List<WebSocketHandlerDecoratorFactory> decoratorFactories = new ArrayList<>(2);


Expand Down Expand Up @@ -151,6 +154,34 @@ protected Integer getSendBufferSizeLimit() {
return this.sendBufferSizeLimit;
}

/**
* Set the maximum time allowed in milliseconds after the WebSocket
* connection is established and before the first sub-protocol message is
* received.
*
* <p>This handler is for WebSocket connections that use a sub-protocol.
* Therefore, we expect the client to send at least one sub-protocol message
* in the beginning, or else we assume the connection isn't doing well, e.g.
* proxy issue, slow network, and can be closed.
*
* <p>By default this is set to {@code 60,000} (1 minute).
*
* @param timeToFirstMessage the maximum time allowed in milliseconds
* @since 5.1
*/
public WebSocketTransportRegistration setTimeToFirstMessage(int timeToFirstMessage) {
this.timeToFirstMessage = timeToFirstMessage;
return this;
}

/**
* Protected accessor for internal use.
*/
@Nullable
protected Integer getTimeToFirstMessage() {
return this.timeToFirstMessage;
}

/**
* Configure one or more factories to decorate the handler used to process
* WebSocket messages. This may be useful in some advanced use cases, for
Expand Down
Expand Up @@ -68,13 +68,8 @@
public class SubProtocolWebSocketHandler
implements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {

/**
* Sessions connected to this handler use a sub-protocol. Hence we expect to
* receive some client messages. If we don't receive any within a minute, the
* connection isn't doing well (proxy issue, slow network?) and can be closed.
* @see #checkSessions()
*/
private static final int TIME_TO_FIRST_MESSAGE = 60 * 1000;
/** The default value for {@link #setTimeToFirstMessage(int) timeToFirstMessage}. */
private static final int DEFAULT_TIME_TO_FIRST_MESSAGE = 60 * 1000;


private final Log logger = LogFactory.getLog(SubProtocolWebSocketHandler.class);
Expand All @@ -98,6 +93,8 @@ public class SubProtocolWebSocketHandler

private int sendBufferSizeLimit = 512 * 1024;

private int timeToFirstMessage = DEFAULT_TIME_TO_FIRST_MESSAGE;

private volatile long lastSessionCheckTime = System.currentTimeMillis();

private final ReentrantLock sessionCheckLock = new ReentrantLock();
Expand Down Expand Up @@ -224,6 +221,35 @@ public int getSendBufferSizeLimit() {
return this.sendBufferSizeLimit;
}

/**
* Set the maximum time allowed in milliseconds after the WebSocket
* connection is established and before the first sub-protocol message is
* received.
*
* <p>This handler is for WebSocket connections that use a sub-protocol.
* Therefore, we expect the client to send at least one sub-protocol message
* in the beginning, or else we assume the connection isn't doing well, e.g.
* proxy issue, slow network, and can be closed.
*
* <p>By default this is set to {@code 60,000} (1 minute).
*
* @param timeToFirstMessage the maximum time allowed in milliseconds
* @since 5.1
* @see #checkSessions()
*/
public void setTimeToFirstMessage(int timeToFirstMessage) {
this.timeToFirstMessage = timeToFirstMessage;
}

/**
* Return the maximum time allowed after the WebSocket connection is
* established and before the first sub-protocol message.
* @since 5.1
*/
public int getTimeToFirstMessage() {
return this.timeToFirstMessage;
}

/**
* Return a String describing internal state and counters.
*/
Expand Down Expand Up @@ -457,7 +483,7 @@ private String resolveSessionId(Message<?> message) {
*/
private void checkSessions() {
long currentTime = System.currentTimeMillis();
if (!isRunning() || (currentTime - this.lastSessionCheckTime < TIME_TO_FIRST_MESSAGE)) {
if (!isRunning() || (currentTime - this.lastSessionCheckTime < getTimeToFirstMessage())) {
return;
}

Expand All @@ -468,7 +494,7 @@ private void checkSessions() {
continue;
}
long timeSinceCreated = currentTime - holder.getCreateTime();
if (timeSinceCreated < TIME_TO_FIRST_MESSAGE) {
if (timeSinceCreated < getTimeToFirstMessage()) {
continue;
}
WebSocketSession session = holder.getSession();
Expand Down
Expand Up @@ -683,6 +683,22 @@
@param sendBufferSizeLimit the maximum number of bytes to buffer when
sending messages; if the value is less than or equal to 0 then buffering
is effectively disabled.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="time-to-first-message" type="xsd:string">

This comment has been minimized.

Copy link
@andrei-ivanov

andrei-ivanov May 18, 2018

maybe xsd:int instead of string?

This comment has been minimized.

Copy link
@rstoyanchev

rstoyanchev May 19, 2018

Author Contributor

Yes indeed.

<xsd:annotation>
<xsd:documentation><![CDATA[
Set the maximum time allowed in milliseconds after the WebSocket
connection is established and before the first sub-protocol message is
received.
This handler is for WebSocket connections that use a sub-protocol.
Therefore, we expect the client to send at least one sub-protocol message
in the beginning, or else we assume the connection isn't doing well, e.g.
proxy issue, slow network, and can be closed.
By default this is set to 60,000 (1 minute).
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
Expand Down
Expand Up @@ -94,8 +94,8 @@
import static org.junit.Assert.*;

/**
* Test fixture for MessageBrokerBeanDefinitionParser.
* See test configuration files websocket-config-broker-*.xml.
* Test fixture for {@link MessageBrokerBeanDefinitionParser}.
* Also see test configuration files websocket-config-broker-*.xml.
*
* @author Brian Clozel
* @author Artem Bilan
Expand Down Expand Up @@ -147,6 +147,7 @@ public void simpleBroker() throws Exception {
assertEquals(Arrays.asList("v10.stomp", "v11.stomp", "v12.stomp"), subProtocolWsHandler.getSubProtocols());
assertEquals(25 * 1000, subProtocolWsHandler.getSendTimeLimit());
assertEquals(1024 * 1024, subProtocolWsHandler.getSendBufferSizeLimit());
assertEquals(30 * 1000, subProtocolWsHandler.getTimeToFirstMessage());

Map<String, SubProtocolHandler> handlerMap = subProtocolWsHandler.getProtocolHandlerMap();
StompSubProtocolHandler stompHandler = (StompSubProtocolHandler) handlerMap.get("v12.stomp");
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,7 +48,6 @@
import org.springframework.stereotype.Controller;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.config.WebSocketMessageBrokerStats;
Expand All @@ -61,16 +60,11 @@
import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;
import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

/**
* Test fixture for
* {@link org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurationSupport}.
* Test fixture for {@link WebSocketMessageBrokerConfigurationSupport}.
*
* @author Rossen Stoyanchev
*/
Expand Down Expand Up @@ -100,8 +94,8 @@ public void clientInboundChannelSendMessage() throws Exception {
session.setOpen(true);
webSocketHandler.afterConnectionEstablished(session);

TextMessage textMessage = StompTextMessageBuilder.create(StompCommand.SEND).headers("destination:/foo").build();
webSocketHandler.handleMessage(session, textMessage);
webSocketHandler.handleMessage(session,
StompTextMessageBuilder.create(StompCommand.SEND).headers("destination:/foo").build());

Message<?> message = channel.messages.get(0);
StompHeaderAccessor accessor = StompHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
Expand Down Expand Up @@ -145,6 +139,7 @@ public void webSocketHandler() {

assertEquals(1024 * 1024, subWsHandler.getSendBufferSizeLimit());
assertEquals(25 * 1000, subWsHandler.getSendTimeLimit());
assertEquals(30 * 1000, subWsHandler.getTimeToFirstMessage());

Map<String, SubProtocolHandler> handlerMap = subWsHandler.getProtocolHandlerMap();
StompSubProtocolHandler protocolHandler = (StompSubProtocolHandler) handlerMap.get("v12.stomp");
Expand Down Expand Up @@ -240,6 +235,7 @@ public void configureWebSocketTransport(WebSocketTransportRegistration registrat
registration.setMessageSizeLimit(128 * 1024);
registration.setSendTimeLimit(25 * 1000);
registration.setSendBufferSizeLimit(1024 * 1024);
registration.setTimeToFirstMessage(30 * 1000);
}

@Override
Expand Down
Expand Up @@ -10,7 +10,7 @@
path-helper="urlPathHelper">

<!-- message-size=128*1024, send-buffer-size=1024*1024 -->
<websocket:transport message-size="131072" send-timeout="25000" send-buffer-size="1048576">
<websocket:transport message-size="131072" send-timeout="25000" send-buffer-size="1048576" time-to-first-message="30000">
<websocket:decorator-factories>
<bean class="org.springframework.web.socket.config.TestWebSocketHandlerDecoratorFactory" />
</websocket:decorator-factories>
Expand Down

0 comments on commit 29158aa

Please sign in to comment.