Skip to content

Commit

Permalink
INT-3515: Fix Some WebSockets Issues
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-3515

Change test suite to the Tomcat according IO

INT-3515: IllegalStateException for the `WebSocketInboundChannelAdapter`, when `useBroker = true`, but there is no Broker Relay in the Context
  • Loading branch information
artembilan authored and garyrussell committed Sep 25, 2014
1 parent 2587ed6 commit a5bdddd
Show file tree
Hide file tree
Showing 16 changed files with 196 additions and 125 deletions.
12 changes: 3 additions & 9 deletions build.gradle
Expand Up @@ -108,6 +108,7 @@ subprojects { subproject ->
saajImplVersion = '1.3.23'
servletApiVersion = '3.1.0'
slf4jVersion = "1.7.6"
tomcatVersion = "7.0.55"
smack3Version = '3.2.1'
smackVersion = '4.0.0'
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '1.4.0.M1'
Expand Down Expand Up @@ -565,15 +566,8 @@ project('spring-integration-websocket') {

compile ("org.springframework:spring-webmvc:$springVersion", optional)

testCompile("org.eclipse.jetty:jetty-webapp:$jettyVersion") {
exclude group: "javax.servlet", module: "javax.servlet"
}
testCompile("org.eclipse.jetty.websocket:websocket-server:$jettyVersion") {
exclude group: "javax.servlet", module: "javax.servlet"
}
testCompile "org.eclipse.jetty.websocket:websocket-client:$jettyVersion"
testCompile"org.eclipse.jetty:jetty-client:$jettyVersion"
testCompile "org.slf4j:slf4j-jcl:$slf4jVersion"
testCompile "org.apache.tomcat.embed:tomcat-embed-websocket:$tomcatVersion"
testCompile("org.apache.tomcat.embed:tomcat-embed-logging-juli:${tomcatVersion}")
}
}

Expand Down
Expand Up @@ -116,6 +116,10 @@ public List<String> getSubProtocols() {
return Collections.unmodifiableList(protocols);
}

public Map<String, WebSocketSession> getSessions() {
return Collections.unmodifiableMap(this.sessions);
}

public WebSocketSession getSession(String sessionId) throws Exception {
WebSocketSession session = this.sessions.get(sessionId);
Assert.notNull(session, "Session not found for id '" + sessionId + "'");
Expand Down
Expand Up @@ -50,11 +50,9 @@ protected boolean shouldGenerateIdAsFallback() {
@Override
protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) {
builder.addConstructorArgReference(element.getAttribute("client"))
.addConstructorArgValue(element.getAttribute("uri"));
String uriVariables = element.getAttribute("uri-variables");
if (StringUtils.hasText(uriVariables)) {
builder.addConstructorArgValue(StringUtils.commaDelimitedListToStringArray(uriVariables));
}
.addConstructorArgValue(element.getAttribute("uri"))
.addConstructorArgValue(StringUtils.commaDelimitedListToStringArray(element.getAttribute("uri-variables")));

IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "send-buffer-size-limit");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "send-time-limit");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "origin");
Expand Down
Expand Up @@ -189,10 +189,9 @@ protected void onInit() {
break;
}
}
if (this.brokerHandler == null) {
logger.warn("'AbstractBrokerMessageHandler' isn't present in the application context. " +
"The non-MESSAGE WebSocketMessages will be ignored.");
}
Assert.state(this.brokerHandler != null,
"WebSocket Broker Relay isn't present in the application context; " +
"it is required when 'useBroker = true'.");
}
}

Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.messaging.SubProtocolHandler;

Expand Down Expand Up @@ -108,7 +109,7 @@ public SubProtocolHandlerRegistry(List<SubProtocolHandler> protocolHandlers,
public SubProtocolHandler findProtocolHandler(WebSocketSession session) {
SubProtocolHandler handler;
String protocol = session.getAcceptedProtocol();
if (protocol != null) {
if (StringUtils.hasText(protocol)) {
handler = this.protocolHandlers.get(protocol);
Assert.state(handler != null,
"No handler for sub-protocol '" + protocol + "', handlers = " + this.protocolHandlers);
Expand Down
Expand Up @@ -25,7 +25,6 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand All @@ -40,15 +39,15 @@
import org.springframework.web.socket.PongMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.client.jetty.JettyWebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;

/**
* @author Artem Bilan
* @since 4.1
*/
public class ClientWebSocketContainerTests {

private final static JettyWebSocketTestServer server = new JettyWebSocketTestServer(TestServerConfig.class);
private final static TomcatWebSocketTestServer server = new TomcatWebSocketTestServer(TestServerConfig.class);

@BeforeClass
public static void setup() throws Exception {
Expand All @@ -63,7 +62,7 @@ public static void tearDown() throws Exception {
@Test
public void testClientWebSocketContainer() throws Exception {
ClientWebSocketContainer container =
new ClientWebSocketContainer(new JettyWebSocketClient(), server.getWsBaseUrl() + "/ws/websocket");
new ClientWebSocketContainer(new StandardWebSocketClient(), server.getWsBaseUrl() + "/ws/websocket");

TestWebSocketListener messageListener = new TestWebSocketListener();
container.setMessageListener(messageListener);
Expand All @@ -75,8 +74,7 @@ public void testClientWebSocketContainer() throws Exception {
assertTrue(session.isOpen());
assertEquals("v10.stomp", session.getAcceptedProtocol());

//TODO Jetty Server treats empty ByteBuffer as 'null' for PongMessage
session.sendMessage(new PingMessage(ByteBuffer.wrap("ping".getBytes())));
session.sendMessage(new PingMessage());

assertTrue(messageListener.messageLatch.await(10, TimeUnit.SECONDS));

Expand Down

This file was deleted.

@@ -0,0 +1,101 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.websocket;

import java.io.File;
import java.io.IOException;

import org.apache.catalina.Context;
import org.apache.catalina.connector.Connector;
import org.apache.catalina.startup.Tomcat;
import org.apache.coyote.http11.Http11NioProtocol;
import org.apache.tomcat.websocket.server.WsContextListener;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.SocketUtils;
import org.springframework.web.context.support.AnnotationConfigWebApplicationContext;
import org.springframework.web.servlet.DispatcherServlet;

/**
* @author Rossen Stoyanchev
* @author Artem Bilan
* @since 4.1
*/
public class TomcatWebSocketTestServer implements InitializingBean, DisposableBean {

private final Tomcat tomcatServer;

private final int port;

private final AnnotationConfigWebApplicationContext serverContext;

public TomcatWebSocketTestServer(Class<?>... serverConfigs) {
this.port = SocketUtils.findAvailableTcpPort();
Connector connector = new Connector(Http11NioProtocol.class.getName());
connector.setPort(this.port);

File baseDir = createTempDir("tomcat");
String baseDirPath = baseDir.getAbsolutePath();

this.tomcatServer = new Tomcat();
this.tomcatServer.setBaseDir(baseDirPath);
this.tomcatServer.setPort(this.port);
this.tomcatServer.getService().addConnector(connector);
this.tomcatServer.setConnector(connector);

this.serverContext = new AnnotationConfigWebApplicationContext();
this.serverContext.register(serverConfigs);

Context context = this.tomcatServer.addContext("", System.getProperty("java.io.tmpdir"));
context.addApplicationListener(WsContextListener.class.getName());
Tomcat.addServlet(context, "dispatcherServlet", new DispatcherServlet(this.serverContext)).setAsyncSupported(true);
context.addServletMapping("/", "dispatcherServlet");
}

private File createTempDir(String prefix) {
try {
File tempFolder = File.createTempFile(prefix + ".", "." + this.port);
tempFolder.delete();
tempFolder.mkdir();
tempFolder.deleteOnExit();
return tempFolder;
}
catch (IOException ex) {
throw new RuntimeException("Unable to create temp directory", ex);
}
}

public AnnotationConfigWebApplicationContext getServerContext() {
return this.serverContext;
}

public String getWsBaseUrl() {
return "ws://localhost:" + this.port;
}

@Override
public void afterPropertiesSet() throws Exception {
this.tomcatServer.start();
}

@Override
public void destroy() throws Exception {
this.tomcatServer.stop();
}

}
Expand Up @@ -52,7 +52,7 @@
import org.springframework.integration.transformer.ExpressionEvaluatingTransformer;
import org.springframework.integration.websocket.ClientWebSocketContainer;
import org.springframework.integration.websocket.IntegrationWebSocketContainer;
import org.springframework.integration.websocket.JettyWebSocketTestServer;
import org.springframework.integration.websocket.TomcatWebSocketTestServer;
import org.springframework.integration.websocket.inbound.WebSocketInboundChannelAdapter;
import org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler;
import org.springframework.integration.websocket.support.SubProtocolHandlerRegistry;
Expand All @@ -72,12 +72,13 @@
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.web.socket.client.jetty.JettyWebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.messaging.StompSubProtocolHandler;
import org.springframework.web.socket.messaging.SubProtocolHandler;
import org.springframework.web.socket.server.jetty.JettyRequestUpgradeStrategy;
import org.springframework.web.socket.server.standard.TomcatRequestUpgradeStrategy;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

/**
Expand Down Expand Up @@ -247,13 +248,13 @@ public void sendMessageToGateway() throws Exception {
public static class ContextConfiguration {

@Bean
public JettyWebSocketTestServer server() {
return new JettyWebSocketTestServer(ServerConfig.class);
public TomcatWebSocketTestServer server() {
return new TomcatWebSocketTestServer(ServerConfig.class);
}

@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
return new ClientWebSocketContainer(new JettyWebSocketClient(), server().getWsBaseUrl() + "/ws/websocket");
return new ClientWebSocketContainer(new StandardWebSocketClient(), server().getWsBaseUrl() + "/ws/websocket");
}

@Bean
Expand Down Expand Up @@ -370,7 +371,7 @@ public ExpressionEvaluatingTransformer greetingTransformer() {

@Bean
public DefaultHandshakeHandler handshakeHandler() {
return new DefaultHandshakeHandler(new JettyRequestUpgradeStrategy());
return new DefaultHandshakeHandler(new TomcatRequestUpgradeStrategy());
}

@Override
Expand Down
Expand Up @@ -40,8 +40,8 @@
import org.springframework.integration.transformer.ObjectToStringTransformer;
import org.springframework.integration.websocket.ClientWebSocketContainer;
import org.springframework.integration.websocket.IntegrationWebSocketContainer;
import org.springframework.integration.websocket.JettyWebSocketTestServer;
import org.springframework.integration.websocket.TestServerConfig;
import org.springframework.integration.websocket.TomcatWebSocketTestServer;
import org.springframework.integration.websocket.inbound.WebSocketInboundChannelAdapter;
import org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler;
import org.springframework.integration.websocket.support.SubProtocolHandlerRegistry;
Expand All @@ -56,7 +56,7 @@
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.jetty.JettyWebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.StompSubProtocolHandler;
import org.springframework.web.socket.messaging.SubProtocolHandler;
import org.springframework.web.socket.sockjs.client.SockJsClient;
Expand Down Expand Up @@ -99,13 +99,13 @@ public void testWebSocketOutboundMessageHandler() throws Exception {
public static class ContextConfiguration {

@Bean
public JettyWebSocketTestServer server() {
return new JettyWebSocketTestServer(ServerFlowConfig.class);
public TomcatWebSocketTestServer server() {
return new TomcatWebSocketTestServer(ServerFlowConfig.class);
}

@Bean
public WebSocketClient webSocketClient() {
return new SockJsClient(Collections.<Transport>singletonList(new WebSocketTransport(new JettyWebSocketClient())));
return new SockJsClient(Collections.<Transport>singletonList(new WebSocketTransport(new StandardWebSocketClient())));
}

@Bean
Expand Down

0 comments on commit a5bdddd

Please sign in to comment.