diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java index 0a301c13b4db..8d48028f9da0 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java @@ -39,7 +39,6 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.component.LifeCycle; -import org.eclipse.jetty.util.thread.ReservedThreadExecutor; import org.eclipse.jetty.util.thread.Scheduler; public class HTTP2ClientConnectionFactory implements ClientConnectionFactory @@ -69,36 +68,19 @@ public Connection newConnection(EndPoint endPoint, Map context) HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl); Parser parser = new Parser(byteBufferPool, session, 4096, 8192); - ReservedThreadExecutor reservedExecutor = provideReservedThreadExecutor(client, executor); - - HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, reservedExecutor, endPoint, + HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, parser, session, client.getInputBufferSize(), promise, listener); connection.addListener(connectionListener); return customize(connection, context); } - protected ReservedThreadExecutor provideReservedThreadExecutor(HTTP2Client client, Executor executor) - { - synchronized (this) - { - ReservedThreadExecutor reservedExecutor = client.getBean(ReservedThreadExecutor.class); - if (reservedExecutor == null) - { - // TODO: see HTTP2Connection.FillableCallback - reservedExecutor = new ReservedThreadExecutor(executor, 0); - client.addManaged(reservedExecutor); - } - return reservedExecutor; - } - } - private class HTTP2ClientConnection extends HTTP2Connection implements Callback { private final HTTP2Client client; private final Promise promise; private final Session.Listener listener; - private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise promise, Session.Listener listener) + private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise promise, Session.Listener listener) { super(byteBufferPool, executor, endpoint, parser, session, bufferSize); this.client = client; diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index 6375f01ebd8d..2555d99008c8 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Queue; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.http2.parser.Parser; @@ -35,7 +36,7 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; -import org.eclipse.jetty.util.thread.ReservedThreadExecutor; +import org.eclipse.jetty.util.thread.TryExecutor; import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; public class HTTP2Connection extends AbstractConnection implements WriteFlusher.Listener @@ -51,14 +52,15 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher. private final int bufferSize; private final ExecutionStrategy strategy; - public HTTP2Connection(ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize) + public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize) { - super(endPoint, executor.getExecutor()); + super(endPoint, executor); this.byteBufferPool = byteBufferPool; this.parser = parser; this.session = session; this.bufferSize = bufferSize; - this.strategy = new EatWhatYouKill(producer, executor.getExecutor(), executor); + // TODO HTTP2 cannot use EWYK without fix for #1803 + this.strategy = new EatWhatYouKill(producer, new TryExecutor.NoTryExecutor(executor)); LifeCycle.start(strategy); } diff --git a/jetty-http2/http2-server/src/main/config/etc/jetty-http2.xml b/jetty-http2/http2-server/src/main/config/etc/jetty-http2.xml index 1d6423589e36..2de7f348d0ae 100644 --- a/jetty-http2/http2-server/src/main/config/etc/jetty-http2.xml +++ b/jetty-http2/http2-server/src/main/config/etc/jetty-http2.xml @@ -12,7 +12,6 @@ - diff --git a/jetty-http2/http2-server/src/main/config/modules/http2.mod b/jetty-http2/http2-server/src/main/config/modules/http2.mod index 2ffa068ede6f..e5def621a6a7 100644 --- a/jetty-http2/http2-server/src/main/config/modules/http2.mod +++ b/jetty-http2/http2-server/src/main/config/modules/http2.mod @@ -27,7 +27,3 @@ etc/jetty-http2.xml ## Initial session receive window (client to server) # jetty.http2.initialSessionRecvWindow=1048576 - -## Reserve threads for high priority tasks (-1 use number of Selectors, 0 no reserved threads) -# jetty.http2.reservedThreads=-1 - diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java index a42d5ccd3997..1128f3cd574e 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java @@ -35,7 +35,6 @@ import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.Name; import org.eclipse.jetty.util.component.LifeCycle; -import org.eclipse.jetty.util.thread.ReservedThreadExecutor; @ManagedObject public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory @@ -49,7 +48,6 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne private int maxHeaderBlockFragment = 0; private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F); private long streamIdleTimeout; - private int reservedThreads; public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration) { @@ -143,20 +141,23 @@ public void setStreamIdleTimeout(long streamIdleTimeout) } /** - * @see ReservedThreadExecutor - * @return The number of reserved threads + * @return -1 + * @deprecated */ - @ManagedAttribute("The number of threads reserved for high priority tasks") + @Deprecated public int getReservedThreads() { - return reservedThreads; + return -1; } + /** + * @param threads ignored + * @deprecated + */ + @Deprecated public void setReservedThreads(int threads) { - // TODO: see also HTTP2Connection.FillableCallback. - // TODO: currently disabled since the only value that works is 0. -// this.reservedThreads = threads; + throw new UnsupportedOperationException(); } public HttpConfiguration getHttpConfiguration() @@ -183,30 +184,14 @@ public Connection newConnection(Connector connector, EndPoint endPoint) streamIdleTimeout = endPoint.getIdleTimeout(); session.setStreamIdleTimeout(streamIdleTimeout); session.setInitialSessionRecvWindow(getInitialSessionRecvWindow()); - - ReservedThreadExecutor executor = provideReservedThreadExecutor(connector); ServerParser parser = newServerParser(connector, session); - HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), executor, + HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(), endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener); connection.addListener(connectionListener); return configure(connection, connector, endPoint); } - protected ReservedThreadExecutor provideReservedThreadExecutor(Connector connector) - { - synchronized (this) - { - ReservedThreadExecutor executor = getBean(ReservedThreadExecutor.class); - if (executor == null) - { - executor = new ReservedThreadExecutor(connector.getExecutor(), getReservedThreads()); - addManaged(executor); - } - return executor; - } - } - protected abstract ServerSessionListener newSessionListener(Connector connector, EndPoint endPoint); protected ServerParser newServerParser(Connector connector, ServerParser.Listener listener) diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java index cef4a34498b1..fb18d564a72f 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.List; import java.util.Queue; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.http.BadMessageException; @@ -58,7 +59,6 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.CountingCallback; import org.eclipse.jetty.util.TypeUtil; -import org.eclipse.jetty.util.thread.ReservedThreadExecutor; public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo { @@ -94,7 +94,7 @@ public static boolean isSupportedProtocol(String protocol) private final HttpConfiguration httpConfig; private boolean recycleHttpChannels; - public HTTP2ServerConnection(ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener) + public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener) { super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize); this.listener = listener; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index c4fa8f42ad52..e8812f841014 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -48,7 +48,6 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; -import org.eclipse.jetty.util.thread.ReservedThreadExecutor; import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; @@ -77,7 +76,7 @@ public ManagedSelector(SelectorManager selectorManager, int id) _id = id; SelectorProducer producer = new SelectorProducer(); Executor executor = selectorManager.getExecutor(); - _strategy = new EatWhatYouKill(producer,executor,_selectorManager.getBean(ReservedThreadExecutor.class)); + _strategy = new EatWhatYouKill(producer,executor); addBean(_strategy,true); setStopTimeout(5000); } @@ -136,17 +135,17 @@ protected void doStop() throws Exception /** * Submit an {@link SelectorUpdate} to be acted on between calls to {@link Selector#select()} - * @param action + * @param update The selector update to apply at next wakeup */ - public void submit(SelectorUpdate action) + public void submit(SelectorUpdate update) { if (LOG.isDebugEnabled()) - LOG.debug("Queued change {} on {}", action, this); + LOG.debug("Queued change {} on {}", update, this); Selector selector = null; synchronized(ManagedSelector.this) { - _updates.offer(action); + _updates.offer(update); if (_selecting) { @@ -265,15 +264,15 @@ public void dump(Appendable out, String indent) throws IOException { Selector selector = _selector; List keys = null; - List actions = null; + List updates = null; if (selector != null && selector.isOpen()) { DumpKeys dump = new DumpKeys(); - String actionsAt; + String updatesAt; synchronized(ManagedSelector.this) { - actionsAt = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()); - actions = new ArrayList<>(_updates); + updatesAt = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()); + updates = new ArrayList<>(_updates); _updates.addFirst(dump); _selecting = false; } @@ -282,7 +281,7 @@ public void dump(Appendable out, String indent) throws IOException String keysAt = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()); if (keys==null) keys = Collections.singletonList("No dump keys retrieved"); - dumpBeans(out, indent, Arrays.asList(new DumpableCollection("actions @ "+actionsAt, actions), + dumpBeans(out, indent, Arrays.asList(new DumpableCollection("updates @ "+updatesAt, updates), new DumpableCollection("keys @ "+keysAt, keys))); } else @@ -295,7 +294,7 @@ public void dump(Appendable out, String indent) throws IOException public String toString() { Selector selector = _selector; - return String.format("%s id=%s keys=%d selected=%d actions=%d", + return String.format("%s id=%s keys=%d selected=%d updates=%d", super.toString(), _id, selector != null && selector.isOpen() ? selector.keys().size() : -1, @@ -377,16 +376,16 @@ private void processUpdates() _updateable.clear(); Selector selector; - int actions; + int updates; synchronized(ManagedSelector.this) { - actions = _updates.size(); - _selecting = actions==0; + updates = _updates.size(); + _selecting = updates==0; selector = _selecting?null:_selector; } if (LOG.isDebugEnabled()) - LOG.debug("actions {}",actions); + LOG.debug("updates {}",updates); if (selector != null) selector.wakeup(); @@ -405,18 +404,18 @@ private boolean select() if (LOG.isDebugEnabled()) LOG.debug("Selector {} woken up from select, {}/{} selected", selector, selected, selector.keys().size()); - int actions; + int updates; synchronized(ManagedSelector.this) { // finished selecting _selecting = false; - actions = _updates.size(); + updates = _updates.size(); } _keys = selector.selectedKeys(); _cursor = _keys.iterator(); if (LOG.isDebugEnabled()) - LOG.debug("Selector {} processing {} keys, {} actions", selector, _keys.size(), actions); + LOG.debug("Selector {} processing {} keys, {} updates", selector, _keys.size(), updates); return true; } @@ -489,7 +488,7 @@ private void updateKeys() { // Do update keys for only previously selected keys. // This will update only those keys whose selection did not cause an - // updateKeys action to be submitted. + // updateKeys update to be submitted. for (SelectionKey key : _keys) updateKey(key); _keys.clear(); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index 8320d2b22d4f..7da0ee066c86 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -38,11 +38,9 @@ import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.ReservedThreadExecutor; import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.ThreadPool; import org.eclipse.jetty.util.thread.ThreadPoolBudget; -import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; /** *

{@link SelectorManager} manages a number of {@link ManagedSelector}s that @@ -63,9 +61,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump private final AtomicInteger _selectorIndex = new AtomicInteger(); private final IntUnaryOperator _selectorIndexUpdate; private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT; - private int _reservedThreads = -1; private ThreadPoolBudget.Lease _lease; - private ReservedThreadExecutor _reservedThreadExecutor; private static int defaultSelectors(Executor executor) { @@ -133,34 +129,23 @@ public void setConnectTimeout(long milliseconds) } /** - * Get the number of preallocated producing threads - * @see EatWhatYouKill - * @see ReservedThreadExecutor - * @return The number of threads preallocated to producing (default -1). + * @return -1 + * @deprecated */ - @ManagedAttribute("The number of reserved producer threads") + @Deprecated public int getReservedThreads() { - return _reservedThreads; + return -1; } - + /** - * Set the number of reserved threads for high priority tasks. - *

Reserved threads are used to take over producing duties, so that a - * producer thread may immediately consume a task it has produced (EatWhatYouKill - * scheduling). If a reserved thread is not available, then produced tasks must - * be submitted to an executor to be executed by a different thread. - * @see EatWhatYouKill - * @see ReservedThreadExecutor - * @param threads The number of producing threads to preallocate. If - * less that 0 (the default), then a heuristic based on the number of CPUs and - * the thread pool size is used to select the number of threads. If 0, no - * threads are preallocated and the EatWhatYouKill scheduler will be - * disabled and all produced tasks will be executed in a separate thread. + * @param threads ignored + * @deprecated */ + @Deprecated public void setReservedThreads(int threads) { - _reservedThreads = threads; + throw new UnsupportedOperationException(); } /** @@ -182,7 +167,7 @@ public int getSelectorCount() return _selectors.length; } - private ManagedSelector chooseSelector(SelectableChannel channel) + private ManagedSelector chooseSelector() { return _selectors[_selectorIndex.updateAndGet(_selectorIndexUpdate)]; } @@ -199,7 +184,7 @@ private ManagedSelector chooseSelector(SelectableChannel channel) */ public void connect(SelectableChannel channel, Object attachment) { - ManagedSelector set = chooseSelector(channel); + ManagedSelector set = chooseSelector(); set.submit(set.new Connect(channel, attachment)); } @@ -224,7 +209,7 @@ public void accept(SelectableChannel channel) */ public void accept(SelectableChannel channel, Object attachment) { - final ManagedSelector selector = chooseSelector(channel); + final ManagedSelector selector = chooseSelector(); selector.submit(selector.new Accept(channel, attachment)); } @@ -239,7 +224,7 @@ public void accept(SelectableChannel channel, Object attachment) */ public Closeable acceptor(SelectableChannel server) { - final ManagedSelector selector = chooseSelector(null); + final ManagedSelector selector = chooseSelector(); ManagedSelector.Acceptor acceptor = selector.new Acceptor(server); selector.submit(acceptor); return acceptor; @@ -262,8 +247,6 @@ protected void accepted(SelectableChannel channel) throws IOException @Override protected void doStart() throws Exception { - _reservedThreadExecutor = new ReservedThreadExecutor(getExecutor(),_reservedThreads,this); - addBean(_reservedThreadExecutor,true); _lease = ThreadPoolBudget.leaseFrom(getExecutor(), this, _selectors.length); for (int i = 0; i < _selectors.length; i++) { @@ -301,9 +284,6 @@ protected void doStop() throws Exception removeBean(selector); } Arrays.fill(_selectors,null); - if (_reservedThreadExecutor!=null) - removeBean(_reservedThreadExecutor); - _reservedThreadExecutor = null; if (_lease != null) _lease.close(); } diff --git a/jetty-osgi/test-jetty-osgi/src/test/config/etc/jetty-http2.xml b/jetty-osgi/test-jetty-osgi/src/test/config/etc/jetty-http2.xml index 1d6423589e36..2de7f348d0ae 100644 --- a/jetty-osgi/test-jetty-osgi/src/test/config/etc/jetty-http2.xml +++ b/jetty-osgi/test-jetty-osgi/src/test/config/etc/jetty-http2.xml @@ -12,7 +12,6 @@ - diff --git a/jetty-server/src/main/config/etc/jetty-http.xml b/jetty-server/src/main/config/etc/jetty-http.xml index f48e48636264..cb0618df3eec 100644 --- a/jetty-server/src/main/config/etc/jetty-http.xml +++ b/jetty-server/src/main/config/etc/jetty-http.xml @@ -42,7 +42,6 @@ - diff --git a/jetty-server/src/main/config/etc/jetty-ssl.xml b/jetty-server/src/main/config/etc/jetty-ssl.xml index 956809bda28e..5c563232d6b6 100644 --- a/jetty-server/src/main/config/etc/jetty-ssl.xml +++ b/jetty-server/src/main/config/etc/jetty-ssl.xml @@ -34,7 +34,6 @@ - diff --git a/jetty-server/src/main/config/etc/jetty-threadpool.xml b/jetty-server/src/main/config/etc/jetty-threadpool.xml index c653f0be8acd..dda4c4e773ea 100644 --- a/jetty-server/src/main/config/etc/jetty-threadpool.xml +++ b/jetty-server/src/main/config/etc/jetty-threadpool.xml @@ -22,6 +22,7 @@ + diff --git a/jetty-server/src/main/config/etc/jetty.xml b/jetty-server/src/main/config/etc/jetty.xml index 2e0f21bb4a59..9f93b35c7398 100644 --- a/jetty-server/src/main/config/etc/jetty.xml +++ b/jetty-server/src/main/config/etc/jetty.xml @@ -24,7 +24,6 @@ - diff --git a/jetty-server/src/main/config/modules/http.mod b/jetty-server/src/main/config/modules/http.mod index 57ad1c82b282..c288d6ab5d9e 100644 --- a/jetty-server/src/main/config/modules/http.mod +++ b/jetty-server/src/main/config/modules/http.mod @@ -40,9 +40,6 @@ etc/jetty-http.xml ## Thread priority delta to give to acceptor threads # jetty.http.acceptorPriorityDelta=0 -## Reserve threads for high priority tasks (-1 use a heuristic, 0 no reserved threads) -# jetty.http.reservedThreads=-1 - ## Connect Timeout in milliseconds # jetty.http.connectTimeout=15000 diff --git a/jetty-server/src/main/config/modules/ssl.mod b/jetty-server/src/main/config/modules/ssl.mod index 109aaba460bc..b9e530f3d254 100644 --- a/jetty-server/src/main/config/modules/ssl.mod +++ b/jetty-server/src/main/config/modules/ssl.mod @@ -44,9 +44,6 @@ basehome:modules/ssl/keystore|etc/keystore ## Thread priority delta to give to acceptor threads # jetty.ssl.acceptorPriorityDelta=0 -## Preallocated producer threads (0 disables EatWhatYouKill scheduling) -# jetty.ssl.reservedThreads=-1 - ## Connect Timeout in milliseconds # jetty.ssl.connectTimeout=15000 diff --git a/jetty-server/src/main/config/modules/threadpool.mod b/jetty-server/src/main/config/modules/threadpool.mod index 58d3e3a30484..591ddebb94fb 100644 --- a/jetty-server/src/main/config/modules/threadpool.mod +++ b/jetty-server/src/main/config/modules/threadpool.mod @@ -13,6 +13,9 @@ etc/jetty-threadpool.xml ## Maximum Number of Threads #jetty.threadPool.maxThreads=200 +## Number of reserved threads (-1 for heuristic) +# jetty.threadPool.reservedThreads=-1 + ## Thread Idle Timeout (in milliseconds) #jetty.threadPool.idleTimeout=60000 diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 1554c60afab2..3541f1c4708e 100755 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -36,7 +36,6 @@ import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedOperation; import org.eclipse.jetty.util.annotation.Name; -import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.DumpableCollection; @@ -46,7 +45,7 @@ import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; @ManagedObject("A thread pool") -public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable +public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadPool, Dumpable, TryExecutor { private static final Logger LOG = Log.getLogger(QueuedThreadPool.class); @@ -61,6 +60,8 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo private int _idleTimeout; private int _maxThreads; private int _minThreads; + private int _reservedThreads = -1; + private TryExecutor _tryExecutor = TryExecutor.NO_TRY; private int _priority = Thread.NORM_PRIORITY; private boolean _daemon = false; private boolean _detailedDump = false; @@ -93,12 +94,17 @@ public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") } public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue queue, @Name("threadGroup") ThreadGroup threadGroup) + { + this(maxThreads, minThreads, idleTimeout, -1, queue, threadGroup); + } + + public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads, @Name("queue") BlockingQueue queue, @Name("threadGroup") ThreadGroup threadGroup) { setMinThreads(minThreads); setMaxThreads(maxThreads); setIdleTimeout(idleTimeout); setStopTimeout(5000); - + setReservedThreads(reservedThreads); if (queue==null) { int capacity=Math.max(_minThreads, 8); @@ -106,7 +112,7 @@ public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") } _jobs=queue; _threadGroup=threadGroup; - _budget=new ThreadPoolBudget(this); + setThreadPoolBudget(new ThreadPoolBudget(this)); } @Override @@ -125,15 +131,21 @@ public void setThreadPoolBudget(ThreadPoolBudget budget) @Override protected void doStart() throws Exception { + _tryExecutor = new ReservedThreadExecutor(this,_reservedThreads); + addBean(_tryExecutor); + super.doStart(); _threadsStarted.set(0); - startThreads(_minThreads); + startThreads(_minThreads); } @Override protected void doStop() throws Exception { + removeBean(_tryExecutor); + _tryExecutor = TryExecutor.NO_TRY; + super.doStop(); long timeout = getStopTimeout(); @@ -222,7 +234,6 @@ public void setDaemon(boolean daemon) * Set the maximum thread idle time. * Threads that are idle for longer than this period may be * stopped. - * Delegated to the named or anonymous Pool. * * @param idleTimeout Max idle time in ms. * @see #getIdleTimeout @@ -234,7 +245,6 @@ public void setIdleTimeout(int idleTimeout) /** * Set the maximum number of threads. - * Delegated to the named or anonymous Pool. * * @param maxThreads maximum number of threads. * @see #getMaxThreads @@ -249,7 +259,6 @@ public void setMaxThreads(int maxThreads) /** * Set the minimum number of threads. - * Delegated to the named or anonymous Pool. * * @param minThreads minimum number of threads * @see #getMinThreads @@ -266,6 +275,19 @@ public void setMinThreads(int minThreads) if (isStarted() && threads < _minThreads) startThreads(_minThreads - threads); } + + /** + * Set the number of reserved threads. + * + * @param reservedThreads number of reserved threads or -1 for heuristically determined + * @see #getReservedThreads + */ + public void setReservedThreads(int reservedThreads) + { + if (isRunning()) + throw new IllegalStateException(getState()); + _reservedThreads = reservedThreads; + } /** * @param name Name of this thread pool to use when naming threads. @@ -289,7 +311,6 @@ public void setThreadsPriority(int priority) /** * Get the maximum thread idle time. - * Delegated to the named or anonymous Pool. * * @return Max idle time in ms. * @see #setIdleTimeout @@ -302,7 +323,6 @@ public int getIdleTimeout() /** * Get the maximum number of threads. - * Delegated to the named or anonymous Pool. * * @return maximum number of threads. * @see #setMaxThreads @@ -316,7 +336,6 @@ public int getMaxThreads() /** * Get the minimum number of threads. - * Delegated to the named or anonymous Pool. * * @return minimum number of threads. * @see #setMinThreads @@ -328,6 +347,20 @@ public int getMinThreads() return _minThreads; } + /** + * Get the number of reserved threads. + * + * @return number of reserved threads or or -1 for heuristically determined + * @see #setReservedThreads + */ + @ManagedAttribute("the number of reserved threads in the pool") + public int getReservedThreads() + { + if (isStarted()) + return getBean(ReservedThreadExecutor.class).getCapacity(); + return _reservedThreads; + } + /** * @return The name of the this thread pool */ @@ -409,6 +442,13 @@ public void execute(Runnable job) } } + @Override + public boolean tryExecute(Runnable task) + { + TryExecutor tryExecutor = _tryExecutor; + return tryExecutor!=null && tryExecutor.tryExecute(task); + } + /** * Blocks until the thread pool is {@link LifeCycle#stop stopped}. */ @@ -509,13 +549,6 @@ protected Thread newThread(Runnable runnable) return new Thread(_threadGroup, runnable); } - @Override - @ManagedOperation("dumps thread pool state") - public String dump() - { - return ContainerLifeCycle.dump(this); - } - @Override public void dump(Appendable out, String indent) throws IOException { @@ -585,14 +618,21 @@ public String dump() if (isDetailedDump()) jobs = new ArrayList<>(getQueue()); - ContainerLifeCycle.dumpObject(out, this); - ContainerLifeCycle.dump(out, indent, threads, Collections.singletonList(new DumpableCollection("jobs",jobs))); + dumpBeans(out, indent, threads, Collections.singletonList(new DumpableCollection("jobs", jobs))); } @Override public String toString() { - return String.format("QueuedThreadPool@%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size())); + return String.format("QueuedThreadPool@%s{%s,%d<=%d<=%d,i=%d,q=%d,r=%s}", + _name, + getState(), + getMinThreads(), + getThreads(), + getMaxThreads(), + getIdleThreads(), + _jobs.size(), + _tryExecutor); } private Runnable idleJobPoll() throws InterruptedException diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java index 340899dec601..639f2ca94164 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java @@ -42,7 +42,7 @@ * whenever it has been idle for that period. */ @ManagedObject("A pool for reserved threads") -public class ReservedThreadExecutor extends AbstractLifeCycle implements Executor +public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor { private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class); private static final Runnable STOP = new Runnable() @@ -66,42 +66,24 @@ public String toString() private final AtomicInteger _pending = new AtomicInteger(); private ThreadPoolBudget.Lease _lease; - private Object _owner; private long _idleTime = 1L; private TimeUnit _idleTimeUnit = TimeUnit.MINUTES; - public ReservedThreadExecutor(Executor executor) - { - this(executor,1); - } - - /** - * @param executor The executor to use to obtain threads - * @param capacity The number of threads to preallocate. If less than 0 then capacity - * is calculated based on a heuristic from the number of available processors and - * thread pool size. - */ - public ReservedThreadExecutor(Executor executor, int capacity) - { - this(executor,capacity,null); - } - /** * @param executor The executor to use to obtain threads * @param capacity The number of threads to preallocate. If less than 0 then capacity * is calculated based on a heuristic from the number of available processors and * thread pool size. - * @param owner the owner of the instance. Only used for debugging purpose withing the {@link #toString()} method */ - public ReservedThreadExecutor(Executor executor,int capacity, Object owner) + public ReservedThreadExecutor(Executor executor,int capacity) { _executor = executor; _capacity = reservedThreads(executor,capacity); _stack = new ConcurrentLinkedDeque<>(); - _owner = owner; LOG.debug("{}",this); } + /** * @param executor The executor to use to obtain threads * @param capacity The number of threads to preallocate, If less than 0 then capacity @@ -110,7 +92,7 @@ public ReservedThreadExecutor(Executor executor,int capacity, Object owner) * @return the number of reserved threads that would be used by a ReservedThreadExecutor * constructed with these arguments. */ - public static int reservedThreads(Executor executor,int capacity) + private static int reservedThreads(Executor executor,int capacity) { if (capacity>=0) return capacity; @@ -118,7 +100,7 @@ public static int reservedThreads(Executor executor,int capacity) if (executor instanceof ThreadPool.SizedThreadPool) { int threads = ((ThreadPool.SizedThreadPool)executor).getMaxThreads(); - return Math.max(1, Math.min(cpus, threads / 8)); + return Math.max(1, Math.min(cpus, threads / 10)); } return cpus; } @@ -193,8 +175,7 @@ public void doStop() throws Exception @Override public void execute(Runnable task) throws RejectedExecutionException { - if (!tryExecute(task)) - throw new RejectedExecutionException(); + _executor.execute(task); } /** @@ -255,13 +236,12 @@ private void startReservedThread() @Override public String toString() { - return String.format("%s@%x{s=%d/%d,p=%d}@%s", + return String.format("%s@%x{s=%d/%d,p=%d}", getClass().getSimpleName(), hashCode(), _size.get(), _capacity, - _pending.get(), - _owner); + _pending.get()); } private class ReservedThread implements Runnable diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TryExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TryExecutor.java new file mode 100644 index 000000000000..9919a09a15e7 --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TryExecutor.java @@ -0,0 +1,72 @@ +// +// ======================================================================== +// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util.thread; + +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; + +/** + * A variation of Executor that can confirm if a thread is available immediately + */ +public interface TryExecutor extends Executor +{ + /** + * Attempt to execute a task. + * @param task The task to be executed + * @return True IFF the task has been given directly to a thread to execute. The task cannot be queued pending the later availability of a Thread. + */ + boolean tryExecute(Runnable task); + + default void execute(Runnable task) + { + if (!tryExecute(task)) + throw new RejectedExecutionException(); + } + + public static TryExecutor asTryExecutor(Executor executor) + { + if (executor instanceof TryExecutor) + return (TryExecutor)executor; + return new NoTryExecutor(executor); + } + + public static class NoTryExecutor implements TryExecutor + { + private final Executor executor; + + public NoTryExecutor(Executor executor) + { + this.executor = executor; + } + + @Override + public void execute(Runnable task) + { + executor.execute(task); + } + + @Override + public boolean tryExecute(Runnable task) + { + return false; + } + } + + public static final TryExecutor NO_TRY = task -> false; +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java index 798a07ce013a..ff4636f01f8d 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java @@ -34,7 +34,7 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Invocable.InvocationType; -import org.eclipse.jetty.util.thread.ReservedThreadExecutor; +import org.eclipse.jetty.util.thread.TryExecutor; /** *

A strategy where the thread that produces will run the resulting task if it @@ -73,25 +73,16 @@ private enum State { IDLE, PENDING, PRODUCING, REPRODUCING } private final LongAdder _executed = new LongAdder(); private final Producer _producer; private final Executor _executor; - private final ReservedThreadExecutor _producers; + private final TryExecutor _tryExecutor; private State _state = State.IDLE; public EatWhatYouKill(Producer producer, Executor executor) - { - this(producer,executor,new ReservedThreadExecutor(executor,1)); - } - - public EatWhatYouKill(Producer producer, Executor executor, int maxReserved) - { - this(producer,executor,new ReservedThreadExecutor(executor,maxReserved)); - } - - public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecutor producers) { _producer = producer; _executor = executor; - _producers = producers; + _tryExecutor = TryExecutor.asTryExecutor(executor); addBean(_producer); + addBean(_tryExecutor); if (LOG.isDebugEnabled()) LOG.debug("{} created", this); } @@ -214,7 +205,7 @@ private void doProduce() { synchronized(this) { - if (_producers.tryExecute(this)) + if (_tryExecutor.tryExecute(this)) { // EXECUTE PRODUCE CONSUME! // We have executed a new Producer, so we can EWYK consume @@ -334,7 +325,7 @@ private void getState(StringBuilder builder) { builder.append(_state); builder.append('/'); - builder.append(_producers); + builder.append(_tryExecutor); builder.append("[nb="); builder.append(getNonBlockingTasksConsumed()); builder.append(",c=");