Skip to content

Commit

Permalink
UNDERTOW-770 Ability to shutdown acceptor and close all connections w…
Browse files Browse the repository at this point in the history
…/o terminating workers
  • Loading branch information
stuartwdouglas committed Oct 7, 2018
1 parent 24393ba commit af51760
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 18 deletions.
33 changes: 33 additions & 0 deletions core/src/main/java/io/undertow/Undertow.java
Expand Up @@ -53,6 +53,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -584,6 +585,7 @@ public static class ListenerInfo {
private final OpenListener openListener;
private final UndertowXnioSsl ssl;
private final AcceptingChannel<? extends StreamConnection> channel;
private volatile boolean suspended = false;

public ListenerInfo(String protcol, SocketAddress address, OpenListener openListener, UndertowXnioSsl ssl, AcceptingChannel<? extends StreamConnection> channel) {
this.protcol = protcol;
Expand Down Expand Up @@ -615,6 +617,37 @@ public void setSslContext(SSLContext sslContext) {
}
}

public synchronized void suspend() {
suspended = true;
channel.suspendAccepts();
CountDownLatch latch = new CountDownLatch(1);
//the channel may be in the middle of an accept, we need to close from the IO thread
channel.getIoThread().execute(new Runnable() {
@Override
public void run() {
try {
openListener.closeConnections();
} finally {
latch.countDown();
}
}
});
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

public synchronized void resume() {
suspended = false;
channel.resumeAccepts();
}

public boolean isSuspended() {
return suspended;
}

public ConnectorStatistics getConnectorStatistics() {
return openListener.getConnectorStatistics();
}
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/io/undertow/server/OpenListener.java
Expand Up @@ -66,4 +66,11 @@ public interface OpenListener extends ChannelListener<StreamConnection> {
* @return The connector statistics, or null if statistics gathering is disabled.
*/
ConnectorStatistics getConnectorStatistics();

/**
* Close all active connections that were handled by this listener
*/
default void closeConnections() {
//nnop by default
}
}
Expand Up @@ -43,6 +43,9 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import static io.undertow.UndertowOptions.DECODE_URL;
import static io.undertow.UndertowOptions.URL_CHARSET;
Expand All @@ -52,6 +55,8 @@
*/
public class AjpOpenListener implements OpenListener {

private final Set<AjpServerConnection> connections = Collections.newSetFromMap(new ConcurrentHashMap<>());

private final ByteBufferPool bufferPool;
private final int bufferSize;

Expand Down Expand Up @@ -134,6 +139,16 @@ public void handleEvent(final StreamConnection channel) {
connection.addCloseListener(closeListener);
}
connection.setAjpReadListener(readListener);

connections.add(connection);
connection.addCloseListener(new ServerConnection.CloseListener() {
@Override
public void closed(ServerConnection c) {
connections.remove(connection);
}
});


readListener.startRequest();
channel.getSourceChannel().setReadListener(readListener);
readListener.handleEvent(channel.getSourceChannel());
Expand Down Expand Up @@ -177,6 +192,13 @@ public ConnectorStatistics getConnectorStatistics() {
return null;
}

@Override
public void closeConnections() {
for(AjpServerConnection i : connections) {
IoUtils.safeClose(i);
}
}

public String getScheme() {
return scheme;
}
Expand Down
Expand Up @@ -166,6 +166,13 @@ public ConnectorStatistics getConnectorStatistics() {
return null;
}

@Override
public void closeConnections() {
for(Map.Entry<String, ListenerEntry> i : listeners.entrySet()) {
i.getValue().listener.closeConnections();
}
}


private static class ListenerEntry implements Comparable<ListenerEntry> {
final DelegateOpenListener listener;
Expand Down
Expand Up @@ -18,6 +18,19 @@

package io.undertow.server.protocol.http;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.xnio.ChannelListener;
import org.xnio.IoUtils;
import org.xnio.OptionMap;
import org.xnio.Options;
import org.xnio.Pool;
import org.xnio.StreamConnection;

import io.undertow.UndertowLogger;
import io.undertow.UndertowMessages;
import io.undertow.UndertowOptions;
Expand All @@ -26,22 +39,14 @@
import io.undertow.conduits.IdleTimeoutConduit;
import io.undertow.conduits.ReadTimeoutStreamSourceConduit;
import io.undertow.conduits.WriteTimeoutStreamSinkConduit;
import io.undertow.connector.ByteBufferPool;
import io.undertow.connector.PooledByteBuffer;
import io.undertow.server.ConnectorStatistics;
import io.undertow.server.ConnectorStatisticsImpl;
import io.undertow.server.DelegateOpenListener;
import io.undertow.server.HttpHandler;
import io.undertow.server.ServerConnection;
import io.undertow.server.XnioByteBufferPool;
import org.xnio.ChannelListener;
import org.xnio.IoUtils;
import org.xnio.OptionMap;
import org.xnio.Options;
import io.undertow.connector.ByteBufferPool;
import io.undertow.connector.PooledByteBuffer;
import org.xnio.Pool;
import org.xnio.StreamConnection;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* Open listener for HTTP server. XNIO should be set up to chain the accept handler to post-accept open
Expand All @@ -51,6 +56,8 @@
*/
public final class HttpOpenListener implements ChannelListener<StreamConnection>, DelegateOpenListener {

private final Set<HttpServerConnection> connections = Collections.newSetFromMap(new ConcurrentHashMap<>());

private final ByteBufferPool bufferPool;
private final int bufferSize;

Expand Down Expand Up @@ -92,6 +99,7 @@ public HttpOpenListener(final ByteBufferPool pool, final OptionMap undertowOptio
public void handleEvent(StreamConnection channel) {
handleEvent(channel, null);
}

@Override
public void handleEvent(final StreamConnection channel, PooledByteBuffer buffer) {
if (UndertowLogger.REQUEST_LOGGER.isTraceEnabled()) {
Expand All @@ -102,7 +110,7 @@ public void handleEvent(final StreamConnection channel, PooledByteBuffer buffer)
try {
Integer readTimeout = channel.getOption(Options.READ_TIMEOUT);
Integer idle = undertowOptions.get(UndertowOptions.IDLE_TIMEOUT);
if(idle != null) {
if (idle != null) {
IdleTimeoutConduit conduit = new IdleTimeoutConduit(channel);
channel.getSourceChannel().setConduit(conduit);
channel.getSinkChannel().setConduit(conduit);
Expand All @@ -121,7 +129,7 @@ public void handleEvent(final StreamConnection channel, PooledByteBuffer buffer)
IoUtils.safeClose(channel);
UndertowLogger.REQUEST_IO_LOGGER.handleUnexpectedFailure(t);
}
if(statisticsEnabled) {
if (statisticsEnabled) {
channel.getSinkChannel().setConduit(new BytesSentStreamSinkConduit(channel.getSinkChannel().getConduit(), connectorStatistics.sentAccumulator()));
channel.getSourceChannel().setConduit(new BytesReceivedStreamSourceConduit(channel.getSourceChannel().getConduit(), connectorStatistics.receivedAccumulator()));
}
Expand All @@ -130,17 +138,24 @@ public void handleEvent(final StreamConnection channel, PooledByteBuffer buffer)
HttpReadListener readListener = new HttpReadListener(connection, parser, statisticsEnabled ? connectorStatistics : null);


if(buffer != null) {
if(buffer.getBuffer().hasRemaining()) {
if (buffer != null) {
if (buffer.getBuffer().hasRemaining()) {
connection.setExtraBytes(buffer);
} else {
buffer.close();
}
}
if(connectorStatistics != null && statisticsEnabled) {
if (connectorStatistics != null && statisticsEnabled) {
connectorStatistics.incrementConnectionCount();
}

connections.add(connection);
connection.addCloseListener(new ServerConnection.CloseListener() {
@Override
public void closed(ServerConnection c) {
connections.remove(connection);
}
});
connection.setReadListener(readListener);
readListener.newRequest();
channel.getSourceChannel().setReadListener(readListener);
Expand Down Expand Up @@ -179,10 +194,17 @@ public ByteBufferPool getBufferPool() {

@Override
public ConnectorStatistics getConnectorStatistics() {
if(statisticsEnabled) {
if (statisticsEnabled) {
return connectorStatistics;
}
return null;
}

@Override
public void closeConnections() {
for(HttpServerConnection i : connections) {
IoUtils.safeClose(i);
}
}

}
Expand Up @@ -30,13 +30,18 @@
import io.undertow.server.HttpHandler;
import io.undertow.server.XnioByteBufferPool;
import org.xnio.ChannelListener;
import org.xnio.IoUtils;
import org.xnio.OptionMap;
import io.undertow.connector.ByteBufferPool;
import io.undertow.connector.PooledByteBuffer;

import org.xnio.Pool;
import org.xnio.StreamConnection;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;


/**
Expand All @@ -45,6 +50,11 @@
* @author Stuart Douglas
*/
public final class Http2OpenListener implements ChannelListener<StreamConnection>, DelegateOpenListener {


private final Set<Http2Channel> connections = Collections.newSetFromMap(new ConcurrentHashMap<>());


public static final String HTTP2 = "h2";
@Deprecated
public static final String HTTP2_14 = "h2-14";
Expand Down Expand Up @@ -116,6 +126,14 @@ public void handleEvent(final StreamConnection channel, PooledByteBuffer buffer)
connectorStatistics.incrementConnectionCount();
http2Channel.addCloseTask(closeTask);
}

connections.add(http2Channel);
http2Channel.addCloseTask(new ChannelListener<Http2Channel>() {
@Override
public void handleEvent(Http2Channel channel) {
connections.remove(channel);
}
});
http2Channel.getReceiveSetter().set(new Http2ReceiveListener(rootHandler, getUndertowOptions(), bufferSize, connectorStatistics));
http2Channel.resumeReceives();

Expand All @@ -128,6 +146,14 @@ public ConnectorStatistics getConnectorStatistics() {
}
return null;
}

@Override
public void closeConnections() {
for(Http2Channel i : connections) {
IoUtils.safeClose(i);
}
}

@Override
public HttpHandler getRootHandler() {
return rootHandler;
Expand Down
10 changes: 9 additions & 1 deletion core/src/test/java/io/undertow/testutils/DefaultServer.java
Expand Up @@ -617,7 +617,6 @@ public static SSLContext getServerSslContext() {
* single client. Client cert mode is not set by default
*/
public static void startSSLServer(OptionMap optionMap) throws IOException {
SSLContext serverContext = getServerSslContext();
clientSslContext = createClientSslContext();
startSSLServer(optionMap, proxyAcceptListener != null ? proxyAcceptListener : acceptListener);
}
Expand Down Expand Up @@ -692,6 +691,11 @@ public static void stopSSLServer() throws IOException {
sslServer = null;
}
clientSslContext = null;
if(proxyOpenListener != null) {
proxyOpenListener.closeConnections();
} else {
openListener.closeConnections();
}
}

public static String getHostAddress(String serverName) {
Expand Down Expand Up @@ -730,6 +734,10 @@ public static void setUndertowOptions(final OptionMap options) {
builder.set(UndertowOptions.ENABLE_HTTP2, true);
}
openListener.setUndertowOptions(builder.getMap());
openListener.closeConnections();
if(proxyOpenListener != null) {
proxyOpenListener.closeConnections();
}
if (loadBalancingProxyClient != null) {
loadBalancingProxyClient.closeCurrentConnections();
}
Expand Down

0 comments on commit af51760

Please sign in to comment.