Skip to content

Commit

Permalink
improvements after review
Browse files Browse the repository at this point in the history
  • Loading branch information
gregw committed Sep 26, 2017
1 parent 23df855 commit 51a3ac3
Showing 1 changed file with 102 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,48 +182,45 @@ public boolean tryExecute(Runnable task)
if (task==null)
return false;

try
ReservedThread thread = _stack.pop();
if (thread==null && task!=STOP)
{
ReservedThread thread = _stack.pop();
if (thread==null && task!=STOP)
{
startReservedThread();
return false;
}

if (_size.decrementAndGet()==0 && task!=STOP)
startReservedThread();
startReservedThread();
return false;
}

if (LOG.isDebugEnabled())
LOG.debug("{} execute {}", this, task);
int size = _size.decrementAndGet();
thread.offer(task);

thread.execute(task);
if (size==0 && task!=STOP)
startReservedThread();

return true;
}
catch(RejectedExecutionException e)
{
LOG.ignore(e);
return false;
}
return true;
}

private void startReservedThread()
{
while(true)
try
{
int pending = _pending.get();
if (pending>=_capacity)
return;
if (_pending.compareAndSet(pending,pending+1))
while (true)
{
if (LOG.isDebugEnabled())
LOG.debug("{} startReservedThread p={}", this, pending+1);
int pending = _pending.get();
if (pending >= _capacity)
return;
if (_pending.compareAndSet(pending, pending + 1))
{
if (LOG.isDebugEnabled())
LOG.debug("{} startReservedThread p={}", this, pending + 1);

_executor.execute(new ReservedThread());
return;
_executor.execute(new ReservedThread());
return;
}
}
}
catch(RejectedExecutionException e)
{
LOG.ignore(e);
}
}

@Override
Expand All @@ -232,17 +229,18 @@ public String toString()
return String.format("%s@%x{s=%d,p=%d}",this.getClass().getSimpleName(),hashCode(),_size.get(),_pending.get());
}


private class ReservedThread extends ConcurrentStack.Node implements Runnable, Executor
private class ReservedThread extends ConcurrentStack.Node implements Runnable
{
private final Locker _locker = new Locker();
private final Condition _wakeup = _locker.newCondition();
private boolean _starting = true;
private Runnable _task = null;

@Override
public void execute(Runnable task)
public void offer(Runnable task)
{
if (LOG.isDebugEnabled())
LOG.debug("{} offer {}", this, task);

try (Locker.Lock lock = _locker.lock())
{
_task = task;
Expand All @@ -252,38 +250,65 @@ public void execute(Runnable task)

public void stop()
{
execute(STOP);
offer(STOP);
}

private Runnable reservedWait()
{
while (_task==null && isRunning())
if (LOG.isDebugEnabled())
LOG.debug("{} waiting", this);

Runnable task = null;
while (isRunning() && task==null)
{
try
boolean idle = false;

try (Locker.Lock lock = _locker.lock())
{
if (_idleTime==0)
_wakeup.await();
else if (!_wakeup.await(_idleTime, _idleTimeUnit))
if (_task == null)
{
// Because threads are held in a stack, excess threads will be
// idle. However, we cannot remove threads from the bottom of
// the stack, so we submit a poison pill job to stop the thread
// on top of the stack (which unfortunately will be the most
// recently used)
if (LOG.isDebugEnabled())
LOG.debug("{} IDLE", this);
ReservedThreadExecutor.this.execute(STOP);
try
{
if (_idleTime == 0)
{
_wakeup.await();
task = _task;
_task = null;
}
else if (_wakeup.await(_idleTime, _idleTimeUnit))
{
task = _task;
_task = null;
}
else
{
idle = true;
}
}
catch (InterruptedException e)
{
LOG.ignore(e);
}
}
}
catch(InterruptedException e)

if (idle)
{
LOG.ignore(e);
// Because threads are held in a stack, excess threads will be
// idle. However, we cannot remove threads from the bottom of
// the stack, so we submit a poison pill job to stop the thread
// on top of the stack (which unfortunately will be the most
// recently used)
if (LOG.isDebugEnabled())
LOG.debug("{} IDLE", this);
ReservedThreadExecutor.this.tryExecute(STOP);
}
}

Runnable task = _task==null?STOP:_task;
_task = null;
return task;
if (LOG.isDebugEnabled())
LOG.debug("{} task={}", this, task);

return task==null?STOP:task;
}

@Override
Expand All @@ -292,44 +317,39 @@ public void run()
while (isRunning())
{
Runnable task = null;
try (Locker.Lock lock = _locker.lock())
{
// Loop until size & pending updated
while(true)
{
int size = _size.get();
if (size>=_capacity)
{
if (_starting)
_pending.decrementAndGet();
return;
}
if (_size.compareAndSet(size,size+1))
{
if (_starting)
_pending.decrementAndGet();
break;
}
}

if (_starting)
// test and increment size BEFORE decrementing pending,
// so that we don't have a race starting new pending.
while(true)
{
int size = _size.get();
if (size>=_capacity)
{
if (LOG.isDebugEnabled())
LOG.debug("{} starting", this);
_starting = false;
LOG.debug("{} size {} > capacity", this, size, _capacity);
if (_starting)
_pending.decrementAndGet();
return;
}
if (_size.compareAndSet(size,size+1))
break;
}

// Insert ourselves in the stack
_stack.push(this);

// Wait for a task
if (LOG.isDebugEnabled())
LOG.debug("{} waiting", this);
task = reservedWait();
if (_starting)
{
if (LOG.isDebugEnabled())
LOG.debug("{} task={}", this, task);
LOG.debug("{} started", this);
_pending.decrementAndGet();
_starting = false;
}

// Insert ourselves in the stack. Size is already incremented, but
// that only effects the decision to keep other threads reserved.
_stack.push(this);

// Wait for a task
task = reservedWait();

if (task==STOP)
// return on STOP poison pill
break;
Expand Down

0 comments on commit 51a3ac3

Please sign in to comment.