From 128ede5ec9653e27799360019e745bb6fd9fe81f Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 24 Aug 2021 10:56:47 +1000 Subject: [PATCH] Review suggestion + Revert RTE to not mutate `_threads` on critical path. + filter `_threads` in dump and doStop (cherry picked from commit 69d7add9e3ce1320ec78c329b3741ff61026658f) --- .../util/thread/ReservedThreadExecutor.java | 84 +++++++++---------- .../thread/ReservedThreadExecutorTest.java | 2 + 2 files changed, 40 insertions(+), 46 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java index e4145d8da9bc..d71a357199ed 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.util.thread; import java.io.IOException; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -26,6 +27,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import org.eclipse.jetty.util.AtomicBiInteger; import org.eclipse.jetty.util.ProcessorUtils; @@ -33,7 +35,6 @@ import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.Dumpable; -import org.eclipse.jetty.util.component.DumpableCollection; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -190,14 +191,10 @@ public void doStop() throws Exception Thread.yield(); _queue.offer(STOP); } + // Interrupt any reserved thread missed the offer, // so they do not wait for the whole idle timeout. - for (ReservedThread reserved : _threads) - { - Thread thread = reserved._thread; - if (thread != null) - thread.interrupt(); - } + _threads.stream().filter(ReservedThread::isReserved).map(t -> t._thread).forEach(Thread::interrupt); _threads.clear(); _count.getAndSetHi(0); } @@ -256,6 +253,7 @@ private void startReservedThread() try { ReservedThread thread = new ReservedThread(); + _threads.add(thread); _executor.execute(thread); } catch (Throwable e) @@ -271,7 +269,7 @@ private void startReservedThread() public void dump(Appendable out, String indent) throws IOException { Dumpable.dumpObjects(out, indent, this, - new DumpableCollection("threads", _threads)); + _threads.stream().filter(ReservedThread::isReserved).collect(Collectors.toList())); } @Override @@ -300,55 +298,48 @@ private class ReservedThread implements Runnable private volatile State _state = State.PENDING; private volatile Thread _thread; + private boolean isReserved() + { + return Objects.equals(_state, State.RESERVED); + } + private Runnable reservedWait() { if (LOG.isDebugEnabled()) LOG.debug("{} waiting {}", this, ReservedThreadExecutor.this); - // This is now a reserved thread. - // Note that this thread must be added to the reserved set - // before checking for lo<0 (i.e. stopped), see doStop(). - _threads.add(this); - try + // Keep waiting until stopped, tasked or idle + while (_count.getLo() >= 0) { - // Keep waiting until stopped, tasked or idle. - while (_count.getLo() >= 0) + try { - try - { - // Always poll at some period as safety to ensure we don't poll forever. - Runnable task = _queue.poll(_idleTimeNanos, NANOSECONDS); - if (LOG.isDebugEnabled()) - LOG.debug("{} task={} {}", this, task, ReservedThreadExecutor.this); - if (task != null) - return task; - - // we have idled out - int size = _count.getLo(); - // decrement size if we have not also been stopped. - while (size > 0) - { - if (_count.compareAndSetLo(size, --size)) - break; - size = _count.getLo(); - } - _state = size >= 0 ? State.IDLE : State.STOPPED; - return STOP; - - } - catch (InterruptedException e) + // Always poll at some period as safety to ensure we don't poll forever. + Runnable task = _queue.poll(_idleTimeNanos, NANOSECONDS); + if (LOG.isDebugEnabled()) + LOG.debug("{} task={} {}", this, task, ReservedThreadExecutor.this); + if (task != null) + return task; + + // we have idled out + int size = _count.getLo(); + // decrement size if we have not also been stopped. + while (size > 0) { - LOG.ignore(e); + if (_count.compareAndSetLo(size, --size)) + break; + size = _count.getLo(); } + _state = size >= 0 ? State.IDLE : State.STOPPED; + return STOP; + + } + catch (InterruptedException e) + { + LOG.ignore(e); } - _state = State.STOPPED; - return STOP; - } - finally - { - // No longer a reserved thread. - _threads.remove(this); } + _state = State.STOPPED; + return STOP; } @Override @@ -421,6 +412,7 @@ public void run() { if (LOG.isDebugEnabled()) LOG.debug("{} exited {}", this, ReservedThreadExecutor.this); + _threads.remove(this); _thread = null; } } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java index 654b248928d1..f15b707047c7 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java @@ -360,6 +360,8 @@ public void run() while (!reserved.tryExecute(() -> {})) Thread.yield(); + System.err.println(reserved.dump()); + reserved.stop(); pool.stop();