Skip to content

Commit

Permalink
UNDERTOW-1443 Websockets should start worker lazily
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Nov 27, 2018
1 parent 93ece1d commit 31a29c9
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 57 deletions.
10 changes: 10 additions & 0 deletions core/src/test/java/io/undertow/testutils/DefaultServer.java
Expand Up @@ -33,6 +33,7 @@
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException; import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException; import java.security.cert.CertificateException;
import java.util.function.Supplier;


import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.KeyManagerFactory;
Expand Down Expand Up @@ -247,6 +248,15 @@ public static ByteBufferPool getBufferPool() {
return pool; return pool;
} }


public static Supplier<XnioWorker> getWorkerSupplier() {
return new Supplier<XnioWorker>() {
@Override
public XnioWorker get() {
return getWorker();
}
};
}

@Override @Override
public Description getDescription() { public Description getDescription() {
return super.getDescription(); return super.getDescription();
Expand Down
Expand Up @@ -42,6 +42,7 @@
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.function.Supplier;


/** /**
* @author Stuart Douglas * @author Stuart Douglas
Expand All @@ -57,15 +58,7 @@ public void handleDeployment(DeploymentInfo deploymentInfo, ServletContext servl
if (info == null) { if (info == null) {
return; return;
} }
XnioWorker worker = info.getWorker(); Supplier<XnioWorker> worker = info.getWorker();
if(worker == null) {
ServerWebSocketContainer defaultContainer = UndertowContainerProvider.getDefaultContainer();
if(defaultContainer == null) {
throw JsrWebSocketLogger.ROOT_LOGGER.xnioWorkerWasNullAndNoDefault();
}
JsrWebSocketLogger.ROOT_LOGGER.xnioWorkerWasNull();
worker = defaultContainer.getXnioWorker();
}
ByteBufferPool buffers = info.getBuffers(); ByteBufferPool buffers = info.getBuffers();
if(buffers == null) { if(buffers == null) {
ServerWebSocketContainer defaultContainer = UndertowContainerProvider.getDefaultContainer(); ServerWebSocketContainer defaultContainer = UndertowContainerProvider.getDefaultContainer();
Expand Down
Expand Up @@ -87,6 +87,7 @@
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;


import static java.lang.System.*; import static java.lang.System.*;


Expand Down Expand Up @@ -114,7 +115,7 @@ public class ServerWebSocketContainer implements ServerContainer, Closeable {
*/ */
private final TreeSet<PathTemplate> seenPaths = new TreeSet<>(); private final TreeSet<PathTemplate> seenPaths = new TreeSet<>();


private final XnioWorker xnioWorker; private final Supplier<XnioWorker> xnioWorker;
private final ByteBufferPool bufferPool; private final ByteBufferPool bufferPool;
private final boolean dispatchToWorker; private final boolean dispatchToWorker;
private final InetSocketAddress clientBindAddress; private final InetSocketAddress clientBindAddress;
Expand All @@ -137,19 +138,19 @@ public class ServerWebSocketContainer implements ServerContainer, Closeable {


private volatile boolean closed = false; private volatile boolean closed = false;


public ServerWebSocketContainer(final ClassIntrospecter classIntrospecter, final XnioWorker xnioWorker, ByteBufferPool bufferPool, List<ThreadSetupHandler> threadSetupHandlers, boolean dispatchToWorker, boolean clientMode) { public ServerWebSocketContainer(final ClassIntrospecter classIntrospecter, final Supplier<XnioWorker> xnioWorker, ByteBufferPool bufferPool, List<ThreadSetupHandler> threadSetupHandlers, boolean dispatchToWorker, boolean clientMode) {
this(classIntrospecter, ServerWebSocketContainer.class.getClassLoader(), xnioWorker, bufferPool, threadSetupHandlers, dispatchToWorker, null, null); this(classIntrospecter, ServerWebSocketContainer.class.getClassLoader(), xnioWorker, bufferPool, threadSetupHandlers, dispatchToWorker, null, null);
} }


public ServerWebSocketContainer(final ClassIntrospecter classIntrospecter, final ClassLoader classLoader, XnioWorker xnioWorker, ByteBufferPool bufferPool, List<ThreadSetupHandler> threadSetupHandlers, boolean dispatchToWorker) { public ServerWebSocketContainer(final ClassIntrospecter classIntrospecter, final ClassLoader classLoader, Supplier<XnioWorker> xnioWorker, ByteBufferPool bufferPool, List<ThreadSetupHandler> threadSetupHandlers, boolean dispatchToWorker) {
this(classIntrospecter, classLoader, xnioWorker, bufferPool, threadSetupHandlers, dispatchToWorker, null, null); this(classIntrospecter, classLoader, xnioWorker, bufferPool, threadSetupHandlers, dispatchToWorker, null, null);
} }


public ServerWebSocketContainer(final ClassIntrospecter classIntrospecter, final ClassLoader classLoader, XnioWorker xnioWorker, ByteBufferPool bufferPool, List<ThreadSetupHandler> threadSetupHandlers, boolean dispatchToWorker, InetSocketAddress clientBindAddress, WebSocketReconnectHandler reconnectHandler) { public ServerWebSocketContainer(final ClassIntrospecter classIntrospecter, final ClassLoader classLoader, Supplier<XnioWorker> xnioWorker, ByteBufferPool bufferPool, List<ThreadSetupHandler> threadSetupHandlers, boolean dispatchToWorker, InetSocketAddress clientBindAddress, WebSocketReconnectHandler reconnectHandler) {
this(classIntrospecter, classLoader, xnioWorker, bufferPool, threadSetupHandlers, dispatchToWorker, clientBindAddress, reconnectHandler, Collections.emptyList()); this(classIntrospecter, classLoader, xnioWorker, bufferPool, threadSetupHandlers, dispatchToWorker, clientBindAddress, reconnectHandler, Collections.emptyList());
} }


public ServerWebSocketContainer(final ClassIntrospecter classIntrospecter, final ClassLoader classLoader, XnioWorker xnioWorker, ByteBufferPool bufferPool, List<ThreadSetupHandler> threadSetupHandlers, boolean dispatchToWorker, InetSocketAddress clientBindAddress, WebSocketReconnectHandler reconnectHandler, List<Extension> installedExtensions) { public ServerWebSocketContainer(final ClassIntrospecter classIntrospecter, final ClassLoader classLoader, Supplier<XnioWorker> xnioWorker, ByteBufferPool bufferPool, List<ThreadSetupHandler> threadSetupHandlers, boolean dispatchToWorker, InetSocketAddress clientBindAddress, WebSocketReconnectHandler reconnectHandler, List<Extension> installedExtensions) {
this.classIntrospecter = classIntrospecter; this.classIntrospecter = classIntrospecter;
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
this.xnioWorker = xnioWorker; this.xnioWorker = xnioWorker;
Expand Down Expand Up @@ -210,14 +211,14 @@ public Session connectToServer(final Object annotatedEndpointInstance, final URI
Endpoint instance = config.getFactory().createInstance(new ImmediateInstanceHandle<>(annotatedEndpointInstance)); Endpoint instance = config.getFactory().createInstance(new ImmediateInstanceHandle<>(annotatedEndpointInstance));
XnioSsl ssl = null; XnioSsl ssl = null;
for (WebsocketClientSslProvider provider : clientSslProviders) { for (WebsocketClientSslProvider provider : clientSslProviders) {
ssl = provider.getSsl(xnioWorker, annotatedEndpointInstance, path); ssl = provider.getSsl(xnioWorker.get(), annotatedEndpointInstance, path);
if (ssl != null) { if (ssl != null) {
break; break;
} }
} }
if(ssl == null) { if(ssl == null) {
try { try {
ssl = new UndertowXnioSsl(xnioWorker.getXnio(), OptionMap.EMPTY, SSLContext.getDefault()); ssl = new UndertowXnioSsl(xnioWorker.get().getXnio(), OptionMap.EMPTY, SSLContext.getDefault());
} catch (NoSuchAlgorithmException e) { } catch (NoSuchAlgorithmException e) {
//ignore //ignore
} }
Expand Down Expand Up @@ -258,14 +259,14 @@ public Session connectToServer(Class<?> aClass, URI uri) throws DeploymentExcept
InstanceHandle<?> instance = config.getInstanceFactory().createInstance(); InstanceHandle<?> instance = config.getInstanceFactory().createInstance();
XnioSsl ssl = null; XnioSsl ssl = null;
for (WebsocketClientSslProvider provider : clientSslProviders) { for (WebsocketClientSslProvider provider : clientSslProviders) {
ssl = provider.getSsl(xnioWorker, aClass, uri); ssl = provider.getSsl(xnioWorker.get(), aClass, uri);
if (ssl != null) { if (ssl != null) {
break; break;
} }
} }
if(ssl == null) { if(ssl == null) {
try { try {
ssl = new UndertowXnioSsl(xnioWorker.getXnio(), OptionMap.EMPTY, SSLContext.getDefault()); ssl = new UndertowXnioSsl(xnioWorker.get().getXnio(), OptionMap.EMPTY, SSLContext.getDefault());
} catch (NoSuchAlgorithmException e) { } catch (NoSuchAlgorithmException e) {
//ignore //ignore
} }
Expand All @@ -284,14 +285,14 @@ public Session connectToServer(final Endpoint endpointInstance, final ClientEndp
ClientEndpointConfig cec = config != null ? config : ClientEndpointConfig.Builder.create().build(); ClientEndpointConfig cec = config != null ? config : ClientEndpointConfig.Builder.create().build();
XnioSsl ssl = null; XnioSsl ssl = null;
for (WebsocketClientSslProvider provider : clientSslProviders) { for (WebsocketClientSslProvider provider : clientSslProviders) {
ssl = provider.getSsl(xnioWorker, endpointInstance, cec, path); ssl = provider.getSsl(xnioWorker.get(), endpointInstance, cec, path);
if (ssl != null) { if (ssl != null) {
break; break;
} }
} }
if(ssl == null) { if(ssl == null) {
try { try {
ssl = new UndertowXnioSsl(xnioWorker.getXnio(), OptionMap.EMPTY, SSLContext.getDefault()); ssl = new UndertowXnioSsl(xnioWorker.get().getXnio(), OptionMap.EMPTY, SSLContext.getDefault());
} catch (NoSuchAlgorithmException e) { } catch (NoSuchAlgorithmException e) {
//ignore //ignore
} }
Expand All @@ -300,7 +301,7 @@ public Session connectToServer(final Endpoint endpointInstance, final ClientEndp
WebSocketClientNegotiation clientNegotiation = new ClientNegotiation(cec.getPreferredSubprotocols(), toExtensionList(cec.getExtensions()), cec); WebSocketClientNegotiation clientNegotiation = new ClientNegotiation(cec.getPreferredSubprotocols(), toExtensionList(cec.getExtensions()), cec);




WebSocketClient.ConnectionBuilder connectionBuilder = WebSocketClient.connectionBuilder(xnioWorker, bufferPool, path) WebSocketClient.ConnectionBuilder connectionBuilder = WebSocketClient.connectionBuilder(xnioWorker.get(), bufferPool, path)
.setSsl(ssl) .setSsl(ssl)
.setBindAddress(clientBindAddress) .setBindAddress(clientBindAddress)
.setClientNegotiation(clientNegotiation); .setClientNegotiation(clientNegotiation);
Expand Down Expand Up @@ -478,7 +479,7 @@ private Session connectToServerInternal(final Endpoint endpointInstance, XnioSsl
WebSocketClientNegotiation clientNegotiation = new ClientNegotiation(cec.getConfig().getPreferredSubprotocols(), toExtensionList(cec.getConfig().getExtensions()), cec.getConfig()); WebSocketClientNegotiation clientNegotiation = new ClientNegotiation(cec.getConfig().getPreferredSubprotocols(), toExtensionList(cec.getConfig().getExtensions()), cec.getConfig());




WebSocketClient.ConnectionBuilder connectionBuilder = WebSocketClient.connectionBuilder(xnioWorker, bufferPool, path) WebSocketClient.ConnectionBuilder connectionBuilder = WebSocketClient.connectionBuilder(xnioWorker.get(), bufferPool, path)
.setSsl(ssl) .setSsl(ssl)
.setBindAddress(clientBindAddress) .setBindAddress(clientBindAddress)
.setClientNegotiation(clientNegotiation); .setClientNegotiation(clientNegotiation);
Expand Down Expand Up @@ -840,7 +841,7 @@ public ByteBufferPool getBufferPool() {
} }


public XnioWorker getXnioWorker() { public XnioWorker getXnioWorker() {
return xnioWorker; return xnioWorker.get();
} }


private static List<WebSocketExtension> toExtensionList(final List<Extension> extensions) { private static List<WebSocketExtension> toExtensionList(final List<Extension> extensions) {
Expand Down
Expand Up @@ -24,6 +24,8 @@
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import javax.websocket.ContainerProvider; import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer; import javax.websocket.WebSocketContainer;


Expand Down Expand Up @@ -83,16 +85,29 @@ static ServerWebSocketContainer getDefaultContainer() {
} }
synchronized (UndertowContainerProvider.class) { synchronized (UndertowContainerProvider.class) {
if (defaultContainer == null) { if (defaultContainer == null) {
try { //this is not great, as we have no way to control the lifecycle
//this is not great, as we have no way to control the lifecycle //but there is not much we can do
//but there is not much we can do //todo: what options should we use here?
//todo: what options should we use here? ByteBufferPool buffers = new DefaultByteBufferPool(directBuffers, 1024, 100, 12);
XnioWorker worker = Xnio.getInstance().createWorker(OptionMap.create(Options.THREAD_DAEMON, true)); defaultContainer = new ServerWebSocketContainer(defaultIntrospector, UndertowContainerProvider.class.getClassLoader(), new Supplier<XnioWorker>() {
ByteBufferPool buffers = new DefaultByteBufferPool(directBuffers, 1024, 100, 12); volatile XnioWorker worker;
defaultContainer = new ServerWebSocketContainer(defaultIntrospector, UndertowContainerProvider.class.getClassLoader(), worker, buffers, Collections.EMPTY_LIST, !invokeInIoThread);
} catch (IOException e) { @Override
throw new RuntimeException(e); public XnioWorker get() {
} if(worker == null) {
synchronized (this) {
if(worker == null) {
try {
worker = Xnio.getInstance().createWorker(OptionMap.create(Options.THREAD_DAEMON, true));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
return worker;
}
}, buffers, Collections.EMPTY_LIST, !invokeInIoThread);
} }
return defaultContainer; return defaultContainer;
} }
Expand Down
Expand Up @@ -29,6 +29,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.function.Supplier;


/** /**
* Web socket deployment information * Web socket deployment information
Expand All @@ -39,7 +40,18 @@ public class WebSocketDeploymentInfo implements Cloneable {


public static final String ATTRIBUTE_NAME = "io.undertow.websockets.jsr.WebSocketDeploymentInfo"; public static final String ATTRIBUTE_NAME = "io.undertow.websockets.jsr.WebSocketDeploymentInfo";


private XnioWorker worker; private Supplier<XnioWorker> worker = new Supplier<XnioWorker>() {

volatile XnioWorker worker;

@Override
public XnioWorker get() {
if(worker != null) {
return worker;
}
return worker = UndertowContainerProvider.getDefaultContainer().getXnioWorker();
}
};
private ByteBufferPool buffers; private ByteBufferPool buffers;
private boolean dispatchToWorkerThread = false; private boolean dispatchToWorkerThread = false;
private final List<Class<?>> annotatedEndpoints = new ArrayList<>(); private final List<Class<?>> annotatedEndpoints = new ArrayList<>();
Expand All @@ -49,15 +61,25 @@ public class WebSocketDeploymentInfo implements Cloneable {
private String clientBindAddress = null; private String clientBindAddress = null;
private WebSocketReconnectHandler reconnectHandler; private WebSocketReconnectHandler reconnectHandler;


public XnioWorker getWorker() { public Supplier<XnioWorker> getWorker() {
return worker; return worker;
} }


public WebSocketDeploymentInfo setWorker(XnioWorker worker) { public WebSocketDeploymentInfo setWorker(Supplier<XnioWorker> worker) {
this.worker = worker; this.worker = worker;
return this; return this;
} }


public WebSocketDeploymentInfo setWorker(XnioWorker worker) {
this.worker = new Supplier<XnioWorker>() {
@Override
public XnioWorker get() {
return worker;
}
};
return this;
}

public ByteBufferPool getBuffers() { public ByteBufferPool getBuffers() {
return buffers; return buffers;
} }
Expand Down
Expand Up @@ -77,7 +77,7 @@ public static void setup() throws Exception {
.addServletContextAttribute(WebSocketDeploymentInfo.ATTRIBUTE_NAME, .addServletContextAttribute(WebSocketDeploymentInfo.ATTRIBUTE_NAME,
new WebSocketDeploymentInfo() new WebSocketDeploymentInfo()
.setBuffers(DefaultServer.getBufferPool()) .setBuffers(DefaultServer.getBufferPool())
.setWorker(DefaultServer.getWorker()) .setWorker(DefaultServer.getWorkerSupplier())
.addListener(new WebSocketDeploymentInfo.ContainerReadyListener() { .addListener(new WebSocketDeploymentInfo.ContainerReadyListener() {
@Override @Override
public void ready(ServerWebSocketContainer container) { public void ready(ServerWebSocketContainer container) {
Expand Down

0 comments on commit 31a29c9

Please sign in to comment.