From 1f18ad1be7077c7d0894d87e7f789f19f9956bdd Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 25 Sep 2019 12:01:44 +1000 Subject: [PATCH] Issue #4105 - starting a thread in QTP now increments idle count Signed-off-by: Lachlan Roberts --- .../jetty/util/thread/QueuedThreadPool.java | 21 ++-- .../util/thread/QueuedThreadPoolTest.java | 107 ++++++++++++++++++ 2 files changed, 115 insertions(+), 13 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 41f02b80e8bf..6a110badd5d8 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -176,7 +176,6 @@ protected void doStop() throws Exception removeBean(_tryExecutor); _tryExecutor = TryExecutor.NO_TRY; - // Signal the Runner threads that we are stopping int threads = _counts.getAndSetHi(Integer.MIN_VALUE); @@ -483,7 +482,7 @@ public void setLowThreadsThreshold(int lowThreadsThreshold) public void execute(Runnable job) { // Determine if we need to start a thread, use and idle thread or just queue this job - boolean startThread; + int startThread; while (true) { // Get the atomic counts @@ -501,10 +500,10 @@ public void execute(Runnable job) // Start a thread if we have insufficient idle threads to meet demand // and we are not at max threads. - startThread = (idle <= 0 && threads < _maxThreads); + startThread = (idle <= 0 && threads < _maxThreads) ? 1 : 0; // The job will be run by an idle thread when available - if (!_counts.compareAndSet(counts, threads + (startThread ? 1 : 0), idle - 1)) + if (!_counts.compareAndSet(counts, threads + startThread, idle + startThread - 1)) continue; break; @@ -513,7 +512,7 @@ public void execute(Runnable job) if (!_jobs.offer(job)) { // reverse our changes to _counts. - if (addCounts(startThread ? -1 : 0, 1)) + if (addCounts(-startThread, 1 - startThread)) LOG.warn("{} rejected {}", this, job); throw new RejectedExecutionException(job.toString()); } @@ -522,7 +521,7 @@ public void execute(Runnable job) LOG.debug("queue {} startThread={}", job, startThread); // Start a thread if one was needed - if (startThread) + while (startThread-- > 0) startThread(); } @@ -617,7 +616,7 @@ private void ensureThreads() if (threads < _minThreads || (idle < 0 && threads < _maxThreads)) { // Then try to start a thread. - if (_counts.compareAndSet(counts, threads + 1, idle)) + if (_counts.compareAndSet(counts, threads + 1, idle + 1)) startThread(); // Otherwise continue to check state again. continue; @@ -645,7 +644,7 @@ protected void startThread() finally { if (!started) - addCounts(-1, 0); // threads, idle + addCounts(-1, -1); // threads, idle } } @@ -859,13 +858,8 @@ public void run() LOG.debug("Runner started for {}", QueuedThreadPool.this); Runnable job = null; - try { - // All threads start idle (not yet taken a job) - if (!addCounts(0, 1)) - return; - while (true) { // If we had a job, signal that we are idle again @@ -873,6 +867,7 @@ public void run() { if (!addCounts(0, 1)) break; + job = null; } // else check we are still running else if (_counts.getHi() == Integer.MIN_VALUE) diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java index edcd78a81377..5d9ec28eed83 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java @@ -45,6 +45,61 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest private static final Logger LOG = Log.getLogger(QueuedThreadPoolTest.class); private final AtomicInteger _jobs = new AtomicInteger(); + private static class TestQueuedThreadPool extends QueuedThreadPool + { + private final AtomicInteger _started; + private final CountDownLatch _enteredRemoveThread; + private final CountDownLatch _exitRemoveThread; + + public TestQueuedThreadPool(AtomicInteger started, CountDownLatch enteredRemoveThread, CountDownLatch exitRemoveThread) + { + _started = started; + _enteredRemoveThread = enteredRemoveThread; + _exitRemoveThread = exitRemoveThread; + } + + public void superStartThread() + { + super.startThread(); + } + + @Override + protected void startThread() + { + switch (_started.incrementAndGet()) + { + case 1: + case 2: + case 3: + super.startThread(); + break; + + case 4: + // deliberately not start thread + break; + + default: + throw new IllegalStateException("too many threads started"); + } + } + + @Override + protected void removeThread(Thread thread) + { + try + { + _enteredRemoveThread.countDown(); + _exitRemoveThread.await(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + super.removeThread(thread); + } + } + private class RunningJob implements Runnable { final CountDownLatch _run = new CountDownLatch(1); @@ -450,6 +505,58 @@ public void testShrink() throws Exception tp.stop(); } + @Test + public void testEnsureThreads() throws Exception + { + AtomicInteger started = new AtomicInteger(0); + + CountDownLatch enteredRemoveThread = new CountDownLatch(1); + CountDownLatch exitRemoveThread = new CountDownLatch(1); + TestQueuedThreadPool tp = new TestQueuedThreadPool(started, enteredRemoveThread, exitRemoveThread); + + tp.setMinThreads(2); + tp.setMaxThreads(10); + tp.setIdleTimeout(400); + tp.setThreadsPriority(Thread.NORM_PRIORITY - 1); + + tp.start(); + waitForIdle(tp, 2); + waitForThreads(tp, 2); + + RunningJob job1 = new RunningJob(); + RunningJob job2 = new RunningJob(); + RunningJob job3 = new RunningJob(); + RunningJob job4 = new RunningJob(); + + tp.execute(job1); + tp.execute(job2); + tp.execute(job3); + + waitForThreads(tp, 3); + waitForIdle(tp, 0); + + job3.stop(); + assertTrue(enteredRemoveThread.await(5, TimeUnit.SECONDS)); + waitForThreads(tp, 3); + waitForIdle(tp, 1); + + // we execute job4 we have an idle thread which is stopping so we fail to start new thread + tp.execute(job4); + + // complete the stop + exitRemoveThread.countDown(); + Thread.sleep(1000); + + assertThat(started.get(), is(4)); + tp.superStartThread(); + assertTrue(job4._run.await(5, TimeUnit.SECONDS)); + + job1.stop(); + job2.stop(); + job4.stop(); + tp.stop(); + } + @Test public void testMaxStopTime() throws Exception {