From 51a3ac37e63108ca93fe2fdd9e02e7f8161072ba Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 26 Sep 2017 10:08:55 +1000 Subject: [PATCH] improvements after review --- .../util/thread/ReservedThreadExecutor.java | 184 ++++++++++-------- 1 file changed, 102 insertions(+), 82 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java index 52a80cb50888..8607727d0901 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java @@ -182,48 +182,45 @@ public boolean tryExecute(Runnable task) if (task==null) return false; - try + ReservedThread thread = _stack.pop(); + if (thread==null && task!=STOP) { - ReservedThread thread = _stack.pop(); - if (thread==null && task!=STOP) - { - startReservedThread(); - return false; - } - - if (_size.decrementAndGet()==0 && task!=STOP) - startReservedThread(); + startReservedThread(); + return false; + } - if (LOG.isDebugEnabled()) - LOG.debug("{} execute {}", this, task); + int size = _size.decrementAndGet(); + thread.offer(task); - thread.execute(task); + if (size==0 && task!=STOP) + startReservedThread(); - return true; - } - catch(RejectedExecutionException e) - { - LOG.ignore(e); - return false; - } + return true; } private void startReservedThread() { - while(true) + try { - int pending = _pending.get(); - if (pending>=_capacity) - return; - if (_pending.compareAndSet(pending,pending+1)) + while (true) { - if (LOG.isDebugEnabled()) - LOG.debug("{} startReservedThread p={}", this, pending+1); + int pending = _pending.get(); + if (pending >= _capacity) + return; + if (_pending.compareAndSet(pending, pending + 1)) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} startReservedThread p={}", this, pending + 1); - _executor.execute(new ReservedThread()); - return; + _executor.execute(new ReservedThread()); + return; + } } } + catch(RejectedExecutionException e) + { + LOG.ignore(e); + } } @Override @@ -232,17 +229,18 @@ public String toString() return String.format("%s@%x{s=%d,p=%d}",this.getClass().getSimpleName(),hashCode(),_size.get(),_pending.get()); } - - private class ReservedThread extends ConcurrentStack.Node implements Runnable, Executor + private class ReservedThread extends ConcurrentStack.Node implements Runnable { private final Locker _locker = new Locker(); private final Condition _wakeup = _locker.newCondition(); private boolean _starting = true; private Runnable _task = null; - @Override - public void execute(Runnable task) + public void offer(Runnable task) { + if (LOG.isDebugEnabled()) + LOG.debug("{} offer {}", this, task); + try (Locker.Lock lock = _locker.lock()) { _task = task; @@ -252,38 +250,65 @@ public void execute(Runnable task) public void stop() { - execute(STOP); + offer(STOP); } private Runnable reservedWait() { - while (_task==null && isRunning()) + if (LOG.isDebugEnabled()) + LOG.debug("{} waiting", this); + + Runnable task = null; + while (isRunning() && task==null) { - try + boolean idle = false; + + try (Locker.Lock lock = _locker.lock()) { - if (_idleTime==0) - _wakeup.await(); - else if (!_wakeup.await(_idleTime, _idleTimeUnit)) + if (_task == null) { - // Because threads are held in a stack, excess threads will be - // idle. However, we cannot remove threads from the bottom of - // the stack, so we submit a poison pill job to stop the thread - // on top of the stack (which unfortunately will be the most - // recently used) - if (LOG.isDebugEnabled()) - LOG.debug("{} IDLE", this); - ReservedThreadExecutor.this.execute(STOP); + try + { + if (_idleTime == 0) + { + _wakeup.await(); + task = _task; + _task = null; + } + else if (_wakeup.await(_idleTime, _idleTimeUnit)) + { + task = _task; + _task = null; + } + else + { + idle = true; + } + } + catch (InterruptedException e) + { + LOG.ignore(e); + } } } - catch(InterruptedException e) + + if (idle) { - LOG.ignore(e); + // Because threads are held in a stack, excess threads will be + // idle. However, we cannot remove threads from the bottom of + // the stack, so we submit a poison pill job to stop the thread + // on top of the stack (which unfortunately will be the most + // recently used) + if (LOG.isDebugEnabled()) + LOG.debug("{} IDLE", this); + ReservedThreadExecutor.this.tryExecute(STOP); } } - Runnable task = _task==null?STOP:_task; - _task = null; - return task; + if (LOG.isDebugEnabled()) + LOG.debug("{} task={}", this, task); + + return task==null?STOP:task; } @Override @@ -292,44 +317,39 @@ public void run() while (isRunning()) { Runnable task = null; - try (Locker.Lock lock = _locker.lock()) - { - // Loop until size & pending updated - while(true) - { - int size = _size.get(); - if (size>=_capacity) - { - if (_starting) - _pending.decrementAndGet(); - return; - } - if (_size.compareAndSet(size,size+1)) - { - if (_starting) - _pending.decrementAndGet(); - break; - } - } - if (_starting) + // test and increment size BEFORE decrementing pending, + // so that we don't have a race starting new pending. + while(true) + { + int size = _size.get(); + if (size>=_capacity) { if (LOG.isDebugEnabled()) - LOG.debug("{} starting", this); - _starting = false; + LOG.debug("{} size {} > capacity", this, size, _capacity); + if (_starting) + _pending.decrementAndGet(); + return; } + if (_size.compareAndSet(size,size+1)) + break; + } - // Insert ourselves in the stack - _stack.push(this); - - // Wait for a task - if (LOG.isDebugEnabled()) - LOG.debug("{} waiting", this); - task = reservedWait(); + if (_starting) + { if (LOG.isDebugEnabled()) - LOG.debug("{} task={}", this, task); + LOG.debug("{} started", this); + _pending.decrementAndGet(); + _starting = false; } + // Insert ourselves in the stack. Size is already incremented, but + // that only effects the decision to keep other threads reserved. + _stack.push(this); + + // Wait for a task + task = reservedWait(); + if (task==STOP) // return on STOP poison pill break;