diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java
index 81d41d0c7a2a..42446e9b37aa 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java
@@ -14,6 +14,7 @@
package org.eclipse.jetty.util.thread;
import java.io.IOException;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -21,6 +22,7 @@
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.ProcessorUtils;
@@ -38,14 +40,14 @@
/**
- * An Executor using pre-allocated/reserved Threads from a wrapped Executor.
- *
Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed
- * with a Thread immediately being assigned the Runnable task, or fail if no Thread is
- * available.
- *
Threads are reserved lazily, with a new reserved threads being allocated from the
- * {@link Executor} passed to the constructor. Whenever 1 or more reserved threads have been
+ *
A TryExecutor using pre-allocated/reserved threads from an external Executor.
+ * Calls to {@link #tryExecute(Runnable)} on ReservedThreadExecutor will either
+ * succeed with a reserved thread immediately being assigned the task, or fail if
+ * no reserved thread is available.
+ * Threads are reserved lazily, with new reserved threads being allocated from the external
+ * {@link Executor} passed to the constructor. Whenever 1 or more reserved threads have been
* idle for more than {@link #getIdleTimeoutMs()} then one reserved thread will return to
- * the executor.
+ * the external Executor.
*/
@ManagedObject("A pool for reserved threads")
public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor, Dumpable
@@ -62,7 +64,7 @@ public void run()
@Override
public String toString()
{
- return "STOP!";
+ return "STOP";
}
};
@@ -72,7 +74,6 @@ public String toString()
private final SynchronousQueue _queue = new SynchronousQueue<>(false);
private final AtomicBiInteger _count = new AtomicBiInteger(); // hi=pending; lo=size;
private final AtomicLong _lastEmptyTime = new AtomicLong(System.nanoTime());
-
private ThreadPoolBudget.Lease _lease;
private long _idleTimeNanos = DEFAULT_IDLE_TIMEOUT;
@@ -175,20 +176,25 @@ public void doStop() throws Exception
super.doStop();
- // Offer STOP task to all waiting reserved threads.
- for (int i = _count.getAndSetLo(-1); i-- > 0;)
+ // Mark this instance as stopped.
+ int size = _count.getAndSetLo(-1);
+
+ // Offer the STOP task to all waiting reserved threads.
+ for (int i = 0; i < size; ++i)
{
- // yield to wait for any reserved threads that have incremented the size but not yet polled
+ // Yield to wait for any reserved threads that
+ // have incremented the size but not yet polled.
Thread.yield();
_queue.offer(STOP);
}
- // Interrupt any reserved thread missed the offer so it doesn't wait too long.
- for (ReservedThread reserved : _threads)
- {
- Thread thread = reserved._thread;
- if (thread != null)
- thread.interrupt();
- }
+
+ // Interrupt any reserved thread missed the offer,
+ // so they do not wait for the whole idle timeout.
+ _threads.stream()
+ .filter(ReservedThread::isReserved)
+ .map(t -> t._thread)
+ .filter(Objects::nonNull)
+ .forEach(Thread::interrupt);
_threads.clear();
_count.getAndSetHi(0);
}
@@ -264,13 +270,16 @@ private void startReservedThread()
public void dump(Appendable out, String indent) throws IOException
{
Dumpable.dumpObjects(out, indent, this,
- new DumpableCollection("reserved", _threads));
+ new DumpableCollection("threads",
+ _threads.stream()
+ .filter(ReservedThread::isReserved)
+ .collect(Collectors.toList())));
}
@Override
public String toString()
{
- return String.format("%s@%x{s=%d/%d,p=%d}",
+ return String.format("%s@%x{reserved=%d/%d,pending=%d}",
getClass().getSimpleName(),
hashCode(),
_count.getLo(),
@@ -293,6 +302,11 @@ private class ReservedThread implements Runnable
private volatile State _state = State.PENDING;
private volatile Thread _thread;
+ private boolean isReserved()
+ {
+ return _state == State.RESERVED;
+ }
+
private Runnable reservedWait()
{
if (LOG.isDebugEnabled())
@@ -321,7 +335,6 @@ private Runnable reservedWait()
}
_state = size >= 0 ? State.IDLE : State.STOPPED;
return STOP;
-
}
catch (InterruptedException e)
{
diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java
index bb6d3a5af43e..c065eb38578a 100644
--- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java
+++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java
@@ -831,7 +831,7 @@ public void testDump() throws Exception
dump = pool.dump();
assertThat(count(dump, " - STARTED"), is(2));
assertThat(dump, containsString(",3<=3<=4,i=2,r=2,q=0"));
- assertThat(dump, containsString("s=0/2"));
+ assertThat(dump, containsString("reserved=0/2"));
assertThat(dump, containsString("[ReservedThreadExecutor@"));
assertThat(count(dump, " IDLE"), is(2));
assertThat(count(dump, " WAITING"), is(1));
@@ -846,7 +846,7 @@ public void testDump() throws Exception
dump = pool.dump();
assertThat(count(dump, " - STARTED"), is(2));
assertThat(dump, containsString(",3<=3<=4,i=1,r=2,q=0"));
- assertThat(dump, containsString("s=1/2"));
+ assertThat(dump, containsString("reserved=1/2"));
assertThat(dump, containsString("[ReservedThreadExecutor@"));
assertThat(count(dump, " IDLE"), is(1));
assertThat(count(dump, " WAITING"), is(1));
diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java
index e3a08b876bf3..58835091022b 100644
--- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java
+++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java
@@ -359,6 +359,5 @@ public void run()
assertThat(usedReserved.get(), greaterThan(0));
assertThat(usedReserved.get() + usedPool.get(), is(LOOPS));
- // System.err.printf("reserved=%d pool=%d total=%d%n", usedReserved.get(), usedPool.get(), LOOPS);
}
}