Skip to content

Commit

Permalink
Issue #1806 Shrink reserved thread as stack
Browse files Browse the repository at this point in the history
  • Loading branch information
gregw committed Sep 13, 2017
1 parent 8c460ae commit 249869b
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,13 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
{
private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class);

private static final Runnable SHRINK = ()->{};
private final Executor _executor;
private final Locker _locker = new Locker();
private final ReservedThread[] _queue;
private int _head;
private final ReservedThread[] _stack;
private int _size;
private int _pending;
private long _idleTime = 1L;
private TimeUnit _idleTimeUnit = TimeUnit.MINUTES;
private long _notIdleNanos = System.nanoTime();

public ReservedThreadExecutor(Executor executor)
{
Expand Down Expand Up @@ -84,7 +81,7 @@ public ReservedThreadExecutor(Executor executor,int capacity)
}
}

_queue = new ReservedThread[capacity];
_stack = new ReservedThread[capacity];
}

public Executor getExecutor()
Expand All @@ -95,7 +92,7 @@ public Executor getExecutor()
@ManagedAttribute(value = "max number of reserved threads", readonly = true)
public int getCapacity()
{
return _queue.length;
return _stack.length;
}

@ManagedAttribute(value = "available reserved threads", readonly = true)
Expand Down Expand Up @@ -136,10 +133,8 @@ public void doStop() throws Exception
{
while (_size>0)
{
ReservedThread thread = _queue[_head];
_queue[_head] = null;
_head = (_head+1)%_queue.length;
_size--;
ReservedThread thread = _stack[--_size];
_stack[_size] = null;
thread._wakeup.signal();
}
}
Expand All @@ -163,24 +158,20 @@ public boolean tryExecute(Runnable task)

try (Locker.Lock lock = _locker.lockIfNotHeld())
{
_notIdleNanos = System.nanoTime();

if (_size==0)
{
if (_pending<_queue.length)
if (_pending<_stack.length)
{
_executor.execute(new ReservedThread());
_pending++;
}
return false;
}

ReservedThread thread = _queue[_head];
_queue[_head] = null;
_head = (_head+1)%_queue.length;
_size--;
ReservedThread thread = _stack[--_size];
_stack[_size] = null;

if (_size==0 && _pending<_queue.length)
if (_size==0 && _pending<_stack.length)
{
_executor.execute(new ReservedThread());
_pending++;
Expand All @@ -207,28 +198,39 @@ public String toString()
}
}


private class ReservedThread implements Runnable
{
private Condition _wakeup = null;
private Runnable _task = null;

private void reservedWait() throws InterruptedException
private Runnable reservedWait()
{
if (_idleTime>0)
while (_task==null)
{
while (true)
if (!isRunning())
return null;
try
{
if (_idleTime==0)
_wakeup.await();
else
{
if (!_wakeup.await(_idleTime, _idleTimeUnit))
{
// We are idle, so we are no longer needed
return null;
}
}
}
catch(InterruptedException e)
{
if (_wakeup.await(_idleTime, _idleTimeUnit))
return;
long now = System.nanoTime();
long period = now - _notIdleNanos;
if (period > _idleTimeUnit.toNanos(_idleTime))
execute(SHRINK);
LOG.ignore(e);
}
}

_wakeup.await();
Runnable task = _task;
_task = null;
return task;
}

@Override
Expand All @@ -247,37 +249,36 @@ public void run()
}

// Exit if no longer running or there now too many preallocated threads
if (!isRunning() || _size>=_queue.length)
if (!isRunning() || _size>=_stack.length)
break;

// Insert ourselves in the queue
_queue[(_head+_size++)%_queue.length] = this;
// Insert ourselves in the stack
_stack[_size++] = this;

// Wait for a task
if (LOG.isDebugEnabled())
LOG.debug("{} waiting", this);
task = reservedWait();
if (LOG.isDebugEnabled())
LOG.debug("{} woken up {}", this, task);

// Wait for a task, ignoring spurious wakeups
while (isRunning() && task==null)
// If no task, we are not needed anymore
if (task==null && isRunning())
{
try
for (int i=0; i<_size; i++)
{
if (LOG.isDebugEnabled())
LOG.debug("{} waiting", this);
reservedWait();
if (LOG.isDebugEnabled())
LOG.debug("{} woken up", this);
task = _task;
_task = null;
}
catch (InterruptedException e)
{
LOG.ignore(e);
if (_stack[i]==this)
{
if (LOG.isDebugEnabled())
LOG.debug("{} shrink {}", this, i);
System.arraycopy(_stack, i + 1, _stack, i, --_size);
return;
}
}
throw new IllegalStateException();
}
}

// Handle shrink poison pill
if (task==SHRINK)
return;

// Run any task
if (task!=null)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,20 +156,18 @@ public void testShrink() throws Exception
latch.countDown();
waitForAvailable(2);

// Check that regular activity keeps the pool size
// Check that regular moderate activity keeps the pool a moderate size
TimeUnit.MILLISECONDS.sleep(IDLE/2);
assertThat(_reservedExecutor.tryExecute(NOOP),is(true));
waitForAvailable(2);
TimeUnit.MILLISECONDS.sleep(IDLE/2);
assertThat(_reservedExecutor.tryExecute(NOOP),is(true));
waitForAvailable(2);
waitForAvailable(1);
TimeUnit.MILLISECONDS.sleep(IDLE/2);
assertThat(_reservedExecutor.tryExecute(NOOP),is(true));
waitForAvailable(2);
waitForAvailable(1);

// check that an idle period reduces size by 1
TimeUnit.MILLISECONDS.sleep(IDLE + IDLE/2);
assertThat(_reservedExecutor.getAvailable(),is(1));
// check fully idle goes to zero
TimeUnit.MILLISECONDS.sleep(IDLE);
assertThat(_reservedExecutor.getAvailable(),is(0));

Expand Down

0 comments on commit 249869b

Please sign in to comment.