From 249869bc49784d088f9d457a5b7e64142db6c0b6 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 13 Sep 2017 12:46:15 +1000 Subject: [PATCH] Issue #1806 Shrink reserved thread as stack --- .../util/thread/ReservedThreadExecutor.java | 103 +++++++++--------- .../thread/ReservedThreadExecutorTest.java | 10 +- 2 files changed, 56 insertions(+), 57 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java index 8f01b0e4b20f..c4645a548499 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java @@ -44,16 +44,13 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo { private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class); - private static final Runnable SHRINK = ()->{}; private final Executor _executor; private final Locker _locker = new Locker(); - private final ReservedThread[] _queue; - private int _head; + private final ReservedThread[] _stack; private int _size; private int _pending; private long _idleTime = 1L; private TimeUnit _idleTimeUnit = TimeUnit.MINUTES; - private long _notIdleNanos = System.nanoTime(); public ReservedThreadExecutor(Executor executor) { @@ -84,7 +81,7 @@ public ReservedThreadExecutor(Executor executor,int capacity) } } - _queue = new ReservedThread[capacity]; + _stack = new ReservedThread[capacity]; } public Executor getExecutor() @@ -95,7 +92,7 @@ public Executor getExecutor() @ManagedAttribute(value = "max number of reserved threads", readonly = true) public int getCapacity() { - return _queue.length; + return _stack.length; } @ManagedAttribute(value = "available reserved threads", readonly = true) @@ -136,10 +133,8 @@ public void doStop() throws Exception { while (_size>0) { - ReservedThread thread = _queue[_head]; - _queue[_head] = null; - _head = (_head+1)%_queue.length; - _size--; + ReservedThread thread = _stack[--_size]; + _stack[_size] = null; thread._wakeup.signal(); } } @@ -163,11 +158,9 @@ public boolean tryExecute(Runnable task) try (Locker.Lock lock = _locker.lockIfNotHeld()) { - _notIdleNanos = System.nanoTime(); - if (_size==0) { - if (_pending<_queue.length) + if (_pending<_stack.length) { _executor.execute(new ReservedThread()); _pending++; @@ -175,12 +168,10 @@ public boolean tryExecute(Runnable task) return false; } - ReservedThread thread = _queue[_head]; - _queue[_head] = null; - _head = (_head+1)%_queue.length; - _size--; + ReservedThread thread = _stack[--_size]; + _stack[_size] = null; - if (_size==0 && _pending<_queue.length) + if (_size==0 && _pending<_stack.length) { _executor.execute(new ReservedThread()); _pending++; @@ -207,28 +198,39 @@ public String toString() } } - private class ReservedThread implements Runnable { private Condition _wakeup = null; private Runnable _task = null; - private void reservedWait() throws InterruptedException + private Runnable reservedWait() { - if (_idleTime>0) + while (_task==null) { - while (true) + if (!isRunning()) + return null; + try + { + if (_idleTime==0) + _wakeup.await(); + else + { + if (!_wakeup.await(_idleTime, _idleTimeUnit)) + { + // We are idle, so we are no longer needed + return null; + } + } + } + catch(InterruptedException e) { - if (_wakeup.await(_idleTime, _idleTimeUnit)) - return; - long now = System.nanoTime(); - long period = now - _notIdleNanos; - if (period > _idleTimeUnit.toNanos(_idleTime)) - execute(SHRINK); + LOG.ignore(e); } } - _wakeup.await(); + Runnable task = _task; + _task = null; + return task; } @Override @@ -247,37 +249,36 @@ public void run() } // Exit if no longer running or there now too many preallocated threads - if (!isRunning() || _size>=_queue.length) + if (!isRunning() || _size>=_stack.length) break; - // Insert ourselves in the queue - _queue[(_head+_size++)%_queue.length] = this; + // Insert ourselves in the stack + _stack[_size++] = this; + + // Wait for a task + if (LOG.isDebugEnabled()) + LOG.debug("{} waiting", this); + task = reservedWait(); + if (LOG.isDebugEnabled()) + LOG.debug("{} woken up {}", this, task); - // Wait for a task, ignoring spurious wakeups - while (isRunning() && task==null) + // If no task, we are not needed anymore + if (task==null && isRunning()) { - try + for (int i=0; i<_size; i++) { - if (LOG.isDebugEnabled()) - LOG.debug("{} waiting", this); - reservedWait(); - if (LOG.isDebugEnabled()) - LOG.debug("{} woken up", this); - task = _task; - _task = null; - } - catch (InterruptedException e) - { - LOG.ignore(e); + if (_stack[i]==this) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} shrink {}", this, i); + System.arraycopy(_stack, i + 1, _stack, i, --_size); + return; + } } + throw new IllegalStateException(); } } - // Handle shrink poison pill - if (task==SHRINK) - return; - - // Run any task if (task!=null) { try diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java index 716cd55ee0b8..94d8cca10113 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java @@ -156,20 +156,18 @@ public void testShrink() throws Exception latch.countDown(); waitForAvailable(2); - // Check that regular activity keeps the pool size + // Check that regular moderate activity keeps the pool a moderate size TimeUnit.MILLISECONDS.sleep(IDLE/2); assertThat(_reservedExecutor.tryExecute(NOOP),is(true)); waitForAvailable(2); TimeUnit.MILLISECONDS.sleep(IDLE/2); assertThat(_reservedExecutor.tryExecute(NOOP),is(true)); - waitForAvailable(2); + waitForAvailable(1); TimeUnit.MILLISECONDS.sleep(IDLE/2); assertThat(_reservedExecutor.tryExecute(NOOP),is(true)); - waitForAvailable(2); + waitForAvailable(1); - // check that an idle period reduces size by 1 - TimeUnit.MILLISECONDS.sleep(IDLE + IDLE/2); - assertThat(_reservedExecutor.getAvailable(),is(1)); + // check fully idle goes to zero TimeUnit.MILLISECONDS.sleep(IDLE); assertThat(_reservedExecutor.getAvailable(),is(0));