Skip to content

Commit

Permalink
Jetty 9.4.x 4105 4121 4122 queued thread pool (#4146)
Browse files Browse the repository at this point in the history
Several QTP fixes:

* #4105 Threads without jobs now check if they should idle die before waiting rather than before, this allows idling under steady load. 3ad6780
* #4121 ThreadFactory behaviour supported by doing thread config within newThread call. 7b306d7
* #4122 Always clear the interrupted status. c37a4ff
   task = queue.poll(timeout);

Signed-off-by: Greg Wilkins <gregw@webtide.com>
  • Loading branch information
gregw committed Oct 2, 2019
1 parent 7810f2d commit 813fcb7
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 59 deletions.
Expand Up @@ -55,8 +55,10 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
/**
* Encodes thread counts:
* <dl>
* <dt>Hi</dt><dd>Total thread count or Integer.MIN_VALUE if stopping</dd>
* <dt>Lo</dt><dd>Net idle threads == idle threads - job queue size</dd>
* <dt>Hi</dt><dd>Total thread count or Integer.MIN_VALUE if the pool is stopping</dd>
* <dt>Lo</dt><dd>Net idle threads == idle threads - job queue size. Essentially if positive,
* this represents the effective number of idle threads, and if negative it represents the
* demand for more threads</dd>
* </dl>
*/
private final AtomicBiInteger _counts = new AtomicBiInteger(Integer.MIN_VALUE, 0);
Expand Down Expand Up @@ -159,6 +161,8 @@ protected void doStart() throws Exception
}
addBean(_tryExecutor);

_lastShrink.set(System.nanoTime());

super.doStart();
// The threads count set to MIN_VALUE is used to signal to Runners that the pool is stopped.
_counts.set(0, 0); // threads, idle
Expand Down Expand Up @@ -290,6 +294,9 @@ public void setDaemon(boolean daemon)
public void setIdleTimeout(int idleTimeout)
{
_idleTimeout = idleTimeout;
ReservedThreadExecutor reserved = getBean(ReservedThreadExecutor.class);
if (reserved != null)
reserved.setIdleTimeout(idleTimeout, TimeUnit.MILLISECONDS);
}

/**
Expand Down Expand Up @@ -443,7 +450,9 @@ public int getThreadsPriority()
@ManagedAttribute("size of the job queue")
public int getQueueSize()
{
return _jobs.size();
// The idle counter encodes demand, which is the effective queue size
int idle = _counts.getLo();
return Math.max(0, -idle);
}

/**
Expand Down Expand Up @@ -631,9 +640,6 @@ protected void startThread()
try
{
Thread thread = newThread(_runnable);
thread.setDaemon(isDaemon());
thread.setPriority(getThreadsPriority());
thread.setName(_name + "-" + thread.getId());
if (LOG.isDebugEnabled())
LOG.debug("Starting {}", thread);
_threads.add(thread);
Expand Down Expand Up @@ -665,7 +671,11 @@ private boolean addCounts(int deltaThreads, int deltaIdle)

protected Thread newThread(Runnable runnable)
{
return new Thread(_threadGroup, runnable);
Thread thread = new Thread(_threadGroup, runnable);
thread.setDaemon(isDaemon());
thread.setPriority(getThreadsPriority());
thread.setName(_name + "-" + thread.getId());
return thread;
}

protected void removeThread(Thread thread)
Expand Down Expand Up @@ -857,17 +867,19 @@ public void run()
if (LOG.isDebugEnabled())
LOG.debug("Runner started for {}", QueuedThreadPool.this);

Runnable job = null;
boolean idle = true;
try
{
Runnable job = null;
while (true)
{
// If we had a job, signal that we are idle again
// If we had a job,
if (job != null)
{
// signal that we are idle again
if (!addCounts(0, 1))
break;
job = null;
idle = true;
}
// else check we are still running
else if (_counts.getHi() == Integer.MIN_VALUE)
Expand All @@ -881,42 +893,37 @@ else if (_counts.getHi() == Integer.MIN_VALUE)
job = _jobs.poll();
if (job == null)
{
// Wait for a job
// No job immediately available maybe we should shrink?
long idleTimeout = getIdleTimeout();
if (idleTimeout > 0 && getThreads() > _minThreads)
{
long last = _lastShrink.get();
long now = System.nanoTime();
if ((now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout) && _lastShrink.compareAndSet(last, now))
{
if (LOG.isDebugEnabled())
LOG.debug("shrinking {}", QueuedThreadPool.this);
break;
}
}

// Wait for a job, only after we have checked if we should shrink
job = idleJobPoll(idleTimeout);

// If still no job?
if (job == null)
{
// maybe we should shrink
if (getThreads() > _minThreads && idleTimeout > 0)
{
long last = _lastShrink.get();
long now = System.nanoTime();
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout))
{
if (_lastShrink.compareAndSet(last, now))
{
if (LOG.isDebugEnabled())
LOG.debug("shrinking {}", QueuedThreadPool.this);
break;
}
}
}
// continue to try again
continue;
}
}

idle = false;

// run job
if (LOG.isDebugEnabled())
LOG.debug("run {} in {}", job, QueuedThreadPool.this);
runJob(job);
if (LOG.isDebugEnabled())
LOG.debug("ran {} in {}", job, QueuedThreadPool.this);

// Clear any interrupted status
Thread.interrupted();
}
catch (InterruptedException e)
{
Expand All @@ -928,6 +935,11 @@ else if (_counts.getHi() == Integer.MIN_VALUE)
{
LOG.warn(e);
}
finally
{
// Clear any interrupted status
Thread.interrupted();
}
}
}
finally
Expand All @@ -936,7 +948,7 @@ else if (_counts.getHi() == Integer.MIN_VALUE)
removeThread(thread);

// Decrement the total thread count and the idle count if we had no job
addCounts(-1, job == null ? -1 : 0);
addCounts(-1, idle ? -1 : 0);
if (LOG.isDebugEnabled())
LOG.debug("{} exited for {}", thread, QueuedThreadPool.this);

Expand Down
Expand Up @@ -28,14 +28,13 @@
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -188,7 +187,7 @@ public void testThreadPool() throws Exception
QueuedThreadPool tp = new QueuedThreadPool();
tp.setMinThreads(2);
tp.setMaxThreads(4);
tp.setIdleTimeout(900);
tp.setIdleTimeout(1000);
tp.setThreadsPriority(Thread.NORM_PRIORITY - 1);

tp.start();
Expand All @@ -199,44 +198,49 @@ public void testThreadPool() throws Exception

// Doesn't shrink to less than min threads
Thread.sleep(3 * tp.getIdleTimeout() / 2);
waitForThreads(tp, 2);
waitForIdle(tp, 2);
assertThat(tp.getThreads(), is(2));
assertThat(tp.getIdleThreads(), is(2));

// Run job0
RunningJob job0 = new RunningJob("JOB0");
tp.execute(job0);
assertTrue(job0._run.await(10, TimeUnit.SECONDS));
waitForIdle(tp, 1);
assertThat(tp.getThreads(), is(2));
assertThat(tp.getIdleThreads(), is(1));

// Run job1
RunningJob job1 = new RunningJob("JOB1");
tp.execute(job1);
assertTrue(job1._run.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 2);
waitForIdle(tp, 0);
assertThat(tp.getThreads(), is(2));
assertThat(tp.getIdleThreads(), is(0));

// Run job2
RunningJob job2 = new RunningJob("JOB2");
tp.execute(job2);
assertTrue(job2._run.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 3);
waitForIdle(tp, 0);
assertThat(tp.getThreads(), is(3));
assertThat(tp.getIdleThreads(), is(0));

// Run job3
RunningJob job3 = new RunningJob("JOB3");
tp.execute(job3);
assertTrue(job3._run.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 4);
waitForIdle(tp, 0);
assertThat(tp.getThreads(), is(4));
assertThat(tp.getIdleThreads(), is(0));

// Check no short term change
Thread.sleep(100);
assertThat(tp.getThreads(), is(4));
assertThat(tp.getIdleThreads(), is(0));

// Run job4. will be queued
RunningJob job4 = new RunningJob("JOB4");
tp.execute(job4);
assertFalse(job4._run.await(1, TimeUnit.SECONDS));
assertFalse(job4._run.await(250, TimeUnit.MILLISECONDS));
assertThat(tp.getThreads(), is(4));
assertThat(tp.getIdleThreads(), is(0));
assertThat(tp.getQueueSize(), is(1));

// finish job 0
job0._stopping.countDown();
Expand All @@ -246,12 +250,13 @@ public void testThreadPool() throws Exception
assertTrue(job4._run.await(10, TimeUnit.SECONDS));
assertThat(tp.getThreads(), is(4));
assertThat(tp.getIdleThreads(), is(0));
assertThat(tp.getQueueSize(), is(0));

// finish job 1
// finish job 1, and it's thread will become idle
job1._stopping.countDown();
assertTrue(job1._stopped.await(10, TimeUnit.SECONDS));
waitForIdle(tp, 1);
assertThat(tp.getThreads(), is(4));
waitForThreads(tp, 4);

// finish job 2,3,4
job2._stopping.countDown();
Expand All @@ -261,15 +266,9 @@ public void testThreadPool() throws Exception
assertTrue(job3._stopped.await(10, TimeUnit.SECONDS));
assertTrue(job4._stopped.await(10, TimeUnit.SECONDS));

waitForIdle(tp, 4);
assertThat(tp.getThreads(), is(4));

long duration = System.nanoTime();
waitForThreads(tp, 3);
assertThat(tp.getIdleThreads(), is(3));
duration = System.nanoTime() - duration;
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.greaterThan(tp.getIdleTimeout() / 2L));
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.lessThan(tp.getIdleTimeout() * 2L));
// Eventually all will have 3 idle threads
waitForIdle(tp, 3);
assertThat(tp.getThreads(), is(3));

tp.stop();
}
Expand Down Expand Up @@ -505,6 +504,58 @@ public void testShrink() throws Exception
tp.stop();
}

@Test
public void testSteadyShrink() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
Runnable job = () ->
{
try
{
latch.await();
}
catch(InterruptedException e)
{
e.printStackTrace();
}
};

QueuedThreadPool tp = new QueuedThreadPool();
tp.setMinThreads(2);
tp.setMaxThreads(10);
int timeout = 500;
tp.setIdleTimeout(timeout);
tp.setThreadsPriority(Thread.NORM_PRIORITY - 1);

tp.start();
waitForIdle(tp, 2);
waitForThreads(tp, 2);

for (int i = 0; i < 10; i++)
tp.execute(job);

waitForThreads(tp, 10);
int threads = tp.getThreads();
// let the jobs run
latch.countDown();

for (int i = 5; i-- > 0; )
{
Thread.sleep(timeout / 2);
tp.execute(job);
}

// Assert that steady rate of jobs doesn't prevent some idling out
assertThat(tp.getThreads(), lessThan(threads));
threads = tp.getThreads();
for (int i = 5; i-- > 0; )
{
Thread.sleep(timeout / 2);
tp.execute(job);
}
assertThat(tp.getThreads(), lessThan(threads));
}

@Test
public void testEnsureThreads() throws Exception
{
Expand Down Expand Up @@ -605,7 +656,7 @@ private void waitForIdle(QueuedThreadPool tp, int idle)
}
now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
assertEquals(idle, tp.getIdleThreads());
assertThat(tp.getIdleThreads(), is(idle));
}

private void waitForReserved(QueuedThreadPool tp, int reserved)
Expand All @@ -624,7 +675,7 @@ private void waitForReserved(QueuedThreadPool tp, int reserved)
}
now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
assertEquals(reserved, reservedThreadExecutor.getAvailable());
assertThat(reservedThreadExecutor.getAvailable(), is(reserved));
}

private void waitForThreads(QueuedThreadPool tp, int threads)
Expand All @@ -642,7 +693,7 @@ private void waitForThreads(QueuedThreadPool tp, int threads)
}
now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
assertEquals(threads, tp.getThreads());
assertThat(tp.getThreads(), is(threads));
}

@Test
Expand Down

0 comments on commit 813fcb7

Please sign in to comment.