Skip to content

Commit

Permalink
SubProtocolWebSocketHandler provides protected decorateSession method
Browse files Browse the repository at this point in the history
Issue: SPR-16089
  • Loading branch information
jhoeller committed Oct 21, 2017
1 parent d418ba1 commit 5809f5b
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 34 deletions.
Expand Up @@ -126,7 +126,7 @@ public void sendMessage(WebSocketMessage<?> message) throws IOException {
if (logger.isTraceEnabled()) {
String text = String.format("Another send already in progress: " +
"session id '%s':, \"in-progress\" send time %d (ms), buffer size %d bytes",
getId(), getTimeSinceSendStarted(), this.bufferSize.get());
getId(), getTimeSinceSendStarted(), getBufferSize());
logger.trace(text);
}
checkSessionLimits();
Expand Down Expand Up @@ -166,14 +166,14 @@ private boolean tryFlushMessageBuffer() throws IOException {
private void checkSessionLimits() {
if (!shouldNotSend() && this.closeLock.tryLock()) {
try {
if (getTimeSinceSendStarted() > this.sendTimeLimit) {
if (getTimeSinceSendStarted() > getSendTimeLimit()) {
String format = "Message send time %d (ms) for session '%s' exceeded the allowed limit %d";
String reason = String.format(format, getTimeSinceSendStarted(), getId(), this.sendTimeLimit);
String reason = String.format(format, getTimeSinceSendStarted(), getId(), getSendTimeLimit());
limitExceeded(reason);
}
else if (this.bufferSize.get() > this.bufferSizeLimit) {
else if (getBufferSize() > getBufferSizeLimit()) {
String format = "The send buffer size %d bytes for session '%s' exceeded the allowed limit %d";
String reason = String.format(format, this.bufferSize.get(), getId(), this.bufferSizeLimit);
String reason = String.format(format, getBufferSize(), getId(), getBufferSizeLimit());
limitExceeded(reason);
}
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,24 +26,23 @@
import org.springframework.web.socket.WebSocketSession;

/**
* A contract for handling WebSocket messages as part of a higher level protocol, referred
* to as "sub-protocol" in the WebSocket RFC specification. Handles both
* A contract for handling WebSocket messages as part of a higher level protocol,
* referred to as "sub-protocol" in the WebSocket RFC specification. Handles both
* {@link WebSocketMessage}s from a client as well as {@link Message}s to a client.
* <p>
* Implementations of this interface can be configured on a
* {@link SubProtocolWebSocketHandler} which selects a sub-protocol handler to delegate
* messages to based on the sub-protocol requested by the client through the
* {@code Sec-WebSocket-Protocol} request header.
*
* <p>Implementations of this interface can be configured on a
* {@link SubProtocolWebSocketHandler} which selects a sub-protocol handler to
* delegate messages to based on the sub-protocol requested by the client through
* the {@code Sec-WebSocket-Protocol} request header.
*
* @author Andy Wilkinson
* @author Rossen Stoyanchev
*
* @since 4.0
*/
public interface SubProtocolHandler {

/**
* Return the list of sub-protocols supported by this handler, never {@code null}.
* Return the list of sub-protocols supported by this handler (never {@code null}).
*/
List<String> getSupportedProtocols();

Expand All @@ -53,12 +52,11 @@ public interface SubProtocolHandler {
* @param message the client message
* @param outputChannel an output channel to send messages to
*/
void handleMessageFromClient(WebSocketSession session, WebSocketMessage<?> message,
MessageChannel outputChannel) throws Exception;
void handleMessageFromClient(WebSocketSession session, WebSocketMessage<?> message, MessageChannel outputChannel)
throws Exception;

/**
* Handle the given {@link Message} to the client associated with the given WebSocket
* session.
* Handle the given {@link Message} to the client associated with the given WebSocket session.
* @param session the client session
* @param message the client message
*/
Expand All @@ -84,7 +82,7 @@ void handleMessageFromClient(WebSocketSession session, WebSocketMessage<?> messa
* @param closeStatus the reason why the session was closed
* @param outputChannel a channel
*/
void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus,
MessageChannel outputChannel) throws Exception;
void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel)
throws Exception;

}
Expand Up @@ -16,7 +16,6 @@

package org.springframework.web.socket.messaging;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -292,7 +291,7 @@ public void afterConnectionEstablished(WebSocketSession session) throws Exceptio
return;
}
this.stats.incrementSessionCount(session);
session = new ConcurrentWebSocketSessionDecorator(session, getSendTimeLimit(), getSendBufferSizeLimit());
session = decorateSession(session);
this.sessions.put(session.getId(), new WebSocketSessionHolder(session));
findProtocolHandler(session).afterSessionStarted(session, this.clientInboundChannel);
}
Expand Down Expand Up @@ -377,6 +376,23 @@ public boolean supportsPartialMessages() {
}


/**
* Decorate the given {@link WebSocketSession}, if desired.
* <p>The default implementation builds a {@link ConcurrentWebSocketSessionDecorator}
* with the configured {@link #getSendTimeLimit() send-time limit} and
* {@link #getSendBufferSizeLimit() buffer-size limit}.
* @param session the original {@code WebSocketSession}
* @return the decorated {@code WebSocketSession}, or potentially the given session as-is
* @since 4.3.13
*/
protected WebSocketSession decorateSession(WebSocketSession session) {
return new ConcurrentWebSocketSessionDecorator(session, getSendTimeLimit(), getSendBufferSizeLimit());
}

/**
* Find a {@link SubProtocolHandler} for the given session.
* @param session the {@code WebSocketSession} to find a handler for
*/
protected final SubProtocolHandler findProtocolHandler(WebSocketSession session) {
String protocol = null;
try {
Expand Down Expand Up @@ -432,12 +448,11 @@ private String resolveSessionId(Message<?> message) {
* When a session is connected through a higher-level protocol it has a chance
* to use heartbeat management to shut down sessions that are too slow to send
* or receive messages. However, after a WebSocketSession is established and
* before the higher level protocol is fully connected there is a possibility
* for sessions to hang. This method checks and closes any sessions that have
* been connected for more than 60 seconds without having received a single
* message.
* before the higher level protocol is fully connected there is a possibility for
* sessions to hang. This method checks and closes any sessions that have been
* connected for more than 60 seconds without having received a single message.
*/
private void checkSessions() throws IOException {
private void checkSessions() {
long currentTime = System.currentTimeMillis();
if (!isRunning() || (currentTime - this.lastSessionCheckTime < TIME_TO_FIRST_MESSAGE)) {
return;
Expand Down Expand Up @@ -497,12 +512,13 @@ private static class WebSocketSessionHolder {

private final WebSocketSession session;

private final long createTime = System.currentTimeMillis();
private final long createTime;

private volatile boolean handledMessages;
private volatile boolean hasHandledMessages;

private WebSocketSessionHolder(WebSocketSession session) {
public WebSocketSessionHolder(WebSocketSession session) {
this.session = session;
this.createTime = System.currentTimeMillis();
}

public WebSocketSession getSession() {
Expand All @@ -514,17 +530,17 @@ public long getCreateTime() {
}

public void setHasHandledMessages() {
this.handledMessages = true;
this.hasHandledMessages = true;
}

public boolean hasHandledMessages() {
return this.handledMessages;
return this.hasHandledMessages;
}

@Override
public String toString() {
return "WebSocketSessionHolder[session=" + this.session + ", createTime=" +
this.createTime + ", hasHandledMessages=" + this.handledMessages + "]";
this.createTime + ", hasHandledMessages=" + this.hasHandledMessages + "]";
}
}

Expand Down

0 comments on commit 5809f5b

Please sign in to comment.