Skip to content

Commit

Permalink
SubProtocolWebSocketHandler provides protected decorateSession method
Browse files Browse the repository at this point in the history
Issue: SPR-16089
  • Loading branch information
jhoeller committed Oct 23, 2017
1 parent 9c7141a commit 268ccb6
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,13 +34,13 @@
* Wrap a {@link org.springframework.web.socket.WebSocketSession WebSocketSession}
* to guarantee only one thread can send messages at a time.
*
* <p>If a send is slow, subsequent attempts to send more messages from other
* threads will not be able to acquire the flush lock and messages will be
* buffered instead -- at that time, the specified buffer-size limit and
* send-time limit will be checked and the session closed if the limits are
* exceeded.
* <p>If a send is slow, subsequent attempts to send more messages from other threads
* will not be able to acquire the flush lock and messages will be buffered instead.
* At that time, the specified buffer-size limit and send-time limit will be checked
* and the session will be closed if the limits are exceeded.
*
* @author Rossen Stoyanchev
* @author Juergen Hoeller
* @since 4.0.3
*/
public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorator {
Expand All @@ -52,7 +52,6 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat

private final int bufferSizeLimit;


private final Queue<WebSocketMessage<?>> buffer = new LinkedBlockingQueue<WebSocketMessage<?>>();

private final AtomicInteger bufferSize = new AtomicInteger();
Expand All @@ -63,7 +62,6 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat

private volatile boolean closeInProgress;


private final Lock flushLock = new ReentrantLock();

private final Lock closeLock = new ReentrantLock();
Expand All @@ -82,10 +80,33 @@ public ConcurrentWebSocketSessionDecorator(WebSocketSession delegate, int sendTi
}


/**
* Return the configured send-time limit (milliseconds).
* @since 4.3.13
*/
public int getSendTimeLimit() {
return this.sendTimeLimit;
}

/**
* Return the configured buffer-size limit (number of bytes).
* @since 4.3.13
*/
public int getBufferSizeLimit() {
return this.bufferSizeLimit;
}

/**
* Return the current buffer size (number of bytes).
*/
public int getBufferSize() {
return this.bufferSize.get();
}

/**
* Return the time (milliseconds) since the current send started,
* or 0 if no send is currently in progress.
*/
public long getTimeSinceSendStarted() {
long start = this.sendStartTime;
return (start > 0 ? (System.currentTimeMillis() - start) : 0);
Expand All @@ -105,7 +126,7 @@ public void sendMessage(WebSocketMessage<?> message) throws IOException {
if (logger.isTraceEnabled()) {
String text = String.format("Another send already in progress: " +
"session id '%s':, \"in-progress\" send time %d (ms), buffer size %d bytes",
getId(), getTimeSinceSendStarted(), this.bufferSize.get());
getId(), getTimeSinceSendStarted(), getBufferSize());
logger.trace(text);
}
checkSessionLimits();
Expand Down Expand Up @@ -142,18 +163,18 @@ private boolean tryFlushMessageBuffer() throws IOException {
return false;
}

private void checkSessionLimits() throws IOException {
private void checkSessionLimits() {
if (!shouldNotSend() && this.closeLock.tryLock()) {
try {
if (getTimeSinceSendStarted() > this.sendTimeLimit) {
if (getTimeSinceSendStarted() > getSendTimeLimit()) {
String format = "Message send time %d (ms) for session '%s' exceeded the allowed limit %d";
String reason = String.format(format, getTimeSinceSendStarted(), getId(), this.sendTimeLimit);
setLimitExceeded(reason);
String reason = String.format(format, getTimeSinceSendStarted(), getId(), getSendTimeLimit());
limitExceeded(reason);
}
else if (this.bufferSize.get() > this.bufferSizeLimit) {
else if (getBufferSize() > getBufferSizeLimit()) {
String format = "The send buffer size %d bytes for session '%s' exceeded the allowed limit %d";
String reason = String.format(format, this.bufferSize.get(), getId(), this.bufferSizeLimit);
setLimitExceeded(reason);
String reason = String.format(format, getBufferSize(), getId(), getBufferSizeLimit());
limitExceeded(reason);
}
}
finally {
Expand All @@ -162,7 +183,7 @@ else if (this.bufferSize.get() > this.bufferSizeLimit) {
}
}

private void setLimitExceeded(String reason) {
private void limitExceeded(String reason) {
this.limitExceeded = true;
throw new SessionLimitExceededException(reason, CloseStatus.SESSION_NOT_RELIABLE);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,24 +25,23 @@
import org.springframework.web.socket.WebSocketSession;

/**
* A contract for handling WebSocket messages as part of a higher level protocol, referred
* to as "sub-protocol" in the WebSocket RFC specification. Handles both
* A contract for handling WebSocket messages as part of a higher level protocol,
* referred to as "sub-protocol" in the WebSocket RFC specification. Handles both
* {@link WebSocketMessage}s from a client as well as {@link Message}s to a client.
* <p>
* Implementations of this interface can be configured on a
* {@link SubProtocolWebSocketHandler} which selects a sub-protocol handler to delegate
* messages to based on the sub-protocol requested by the client through the
* {@code Sec-WebSocket-Protocol} request header.
*
* <p>Implementations of this interface can be configured on a
* {@link SubProtocolWebSocketHandler} which selects a sub-protocol handler to
* delegate messages to based on the sub-protocol requested by the client through
* the {@code Sec-WebSocket-Protocol} request header.
*
* @author Andy Wilkinson
* @author Rossen Stoyanchev
*
* @since 4.0
*/
public interface SubProtocolHandler {

/**
* Return the list of sub-protocols supported by this handler, never {@code null}.
* Return the list of sub-protocols supported by this handler (never {@code null}).
*/
List<String> getSupportedProtocols();

Expand All @@ -52,12 +51,11 @@ public interface SubProtocolHandler {
* @param message the client message
* @param outputChannel an output channel to send messages to
*/
void handleMessageFromClient(WebSocketSession session, WebSocketMessage<?> message,
MessageChannel outputChannel) throws Exception;
void handleMessageFromClient(WebSocketSession session, WebSocketMessage<?> message, MessageChannel outputChannel)
throws Exception;

/**
* Handle the given {@link Message} to the client associated with the given WebSocket
* session.
* Handle the given {@link Message} to the client associated with the given WebSocket session.
* @param session the client session
* @param message the client message
*/
Expand All @@ -82,7 +80,7 @@ void handleMessageFromClient(WebSocketSession session, WebSocketMessage<?> messa
* @param closeStatus the reason why the session was closed
* @param outputChannel a channel
*/
void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus,
MessageChannel outputChannel) throws Exception;
void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel)
throws Exception;

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,6 @@

package org.springframework.web.socket.messaging;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -289,7 +288,7 @@ public void afterConnectionEstablished(WebSocketSession session) throws Exceptio
return;
}
this.stats.incrementSessionCount(session);
session = new ConcurrentWebSocketSessionDecorator(session, getSendTimeLimit(), getSendBufferSizeLimit());
session = decorateSession(session);
this.sessions.put(session.getId(), new WebSocketSessionHolder(session));
findProtocolHandler(session).afterSessionStarted(session, this.clientInboundChannel);
}
Expand Down Expand Up @@ -374,6 +373,23 @@ public boolean supportsPartialMessages() {
}


/**
* Decorate the given {@link WebSocketSession}, if desired.
* <p>The default implementation builds a {@link ConcurrentWebSocketSessionDecorator}
* with the configured {@link #getSendTimeLimit() send-time limit} and
* {@link #getSendBufferSizeLimit() buffer-size limit}.
* @param session the original {@code WebSocketSession}
* @return the decorated {@code WebSocketSession}, or potentially the given session as-is
* @since 4.3.13
*/
protected WebSocketSession decorateSession(WebSocketSession session) {
return new ConcurrentWebSocketSessionDecorator(session, getSendTimeLimit(), getSendBufferSizeLimit());
}

/**
* Find a {@link SubProtocolHandler} for the given session.
* @param session the {@code WebSocketSession} to find a handler for
*/
protected final SubProtocolHandler findProtocolHandler(WebSocketSession session) {
String protocol = null;
try {
Expand Down Expand Up @@ -428,12 +444,11 @@ private String resolveSessionId(Message<?> message) {
* When a session is connected through a higher-level protocol it has a chance
* to use heartbeat management to shut down sessions that are too slow to send
* or receive messages. However, after a WebSocketSession is established and
* before the higher level protocol is fully connected there is a possibility
* for sessions to hang. This method checks and closes any sessions that have
* been connected for more than 60 seconds without having received a single
* message.
* before the higher level protocol is fully connected there is a possibility for
* sessions to hang. This method checks and closes any sessions that have been
* connected for more than 60 seconds without having received a single message.
*/
private void checkSessions() throws IOException {
private void checkSessions() {
long currentTime = System.currentTimeMillis();
if (!isRunning() || (currentTime - this.lastSessionCheckTime < TIME_TO_FIRST_MESSAGE)) {
return;
Expand Down Expand Up @@ -493,12 +508,13 @@ private static class WebSocketSessionHolder {

private final WebSocketSession session;

private final long createTime = System.currentTimeMillis();
private final long createTime;

private volatile boolean handledMessages;
private volatile boolean hasHandledMessages;

private WebSocketSessionHolder(WebSocketSession session) {
public WebSocketSessionHolder(WebSocketSession session) {
this.session = session;
this.createTime = System.currentTimeMillis();
}

public WebSocketSession getSession() {
Expand All @@ -510,17 +526,17 @@ public long getCreateTime() {
}

public void setHasHandledMessages() {
this.handledMessages = true;
this.hasHandledMessages = true;
}

public boolean hasHandledMessages() {
return this.handledMessages;
return this.hasHandledMessages;
}

@Override
public String toString() {
return "WebSocketSessionHolder[session=" + this.session + ", createTime=" +
this.createTime + ", hasHandledMessages=" + this.handledMessages + "]";
this.createTime + ", hasHandledMessages=" + this.hasHandledMessages + "]";
}
}

Expand Down

0 comments on commit 268ccb6

Please sign in to comment.