Skip to content

Commit

Permalink
Review suggestion
Browse files Browse the repository at this point in the history
+ Revert RTE to not mutate `_threads` on critical path.
+ filter `_threads` in dump and doStop
  • Loading branch information
gregw committed Aug 24, 2021
1 parent 898dac3 commit 69d7add
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@
package org.eclipse.jetty.util.thread;

import java.io.IOException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

Expand Down Expand Up @@ -190,14 +191,10 @@ public void doStop() throws Exception
Thread.yield();
_queue.offer(STOP);
}

// Interrupt any reserved thread missed the offer,
// so they do not wait for the whole idle timeout.
for (ReservedThread reserved : _threads)
{
Thread thread = reserved._thread;
if (thread != null)
thread.interrupt();
}
_threads.stream().filter(ReservedThread::isReserved).map(t -> t._thread).forEach(Thread::interrupt);
_threads.clear();
_count.getAndSetHi(0);
}
Expand Down Expand Up @@ -256,6 +253,7 @@ private void startReservedThread()
try
{
ReservedThread thread = new ReservedThread();
_threads.add(thread);
_executor.execute(thread);
}
catch (Throwable e)
Expand All @@ -271,7 +269,7 @@ private void startReservedThread()
public void dump(Appendable out, String indent) throws IOException
{
Dumpable.dumpObjects(out, indent, this,
new DumpableCollection("threads", _threads));
_threads.stream().filter(ReservedThread::isReserved).collect(Collectors.toList()));
}

@Override
Expand Down Expand Up @@ -300,55 +298,48 @@ private class ReservedThread implements Runnable
private volatile State _state = State.PENDING;
private volatile Thread _thread;

private boolean isReserved()
{
return Objects.equals(_state, State.RESERVED);
}

private Runnable reservedWait()
{
if (LOG.isDebugEnabled())
LOG.debug("{} waiting {}", this, ReservedThreadExecutor.this);

// This is now a reserved thread.
// Note that this thread must be added to the reserved set
// before checking for lo<0 (i.e. stopped), see doStop().
_threads.add(this);
try
// Keep waiting until stopped, tasked or idle
while (_count.getLo() >= 0)
{
// Keep waiting until stopped, tasked or idle.
while (_count.getLo() >= 0)
try
{
try
{
// Always poll at some period as safety to ensure we don't poll forever.
Runnable task = _queue.poll(_idleTimeNanos, NANOSECONDS);
if (LOG.isDebugEnabled())
LOG.debug("{} task={} {}", this, task, ReservedThreadExecutor.this);
if (task != null)
return task;

// we have idled out
int size = _count.getLo();
// decrement size if we have not also been stopped.
while (size > 0)
{
if (_count.compareAndSetLo(size, --size))
break;
size = _count.getLo();
}
_state = size >= 0 ? State.IDLE : State.STOPPED;
return STOP;

}
catch (InterruptedException e)
// Always poll at some period as safety to ensure we don't poll forever.
Runnable task = _queue.poll(_idleTimeNanos, NANOSECONDS);
if (LOG.isDebugEnabled())
LOG.debug("{} task={} {}", this, task, ReservedThreadExecutor.this);
if (task != null)
return task;

// we have idled out
int size = _count.getLo();
// decrement size if we have not also been stopped.
while (size > 0)
{
LOG.ignore(e);
if (_count.compareAndSetLo(size, --size))
break;
size = _count.getLo();
}
_state = size >= 0 ? State.IDLE : State.STOPPED;
return STOP;

}
catch (InterruptedException e)
{
LOG.ignore(e);
}
_state = State.STOPPED;
return STOP;
}
finally
{
// No longer a reserved thread.
_threads.remove(this);
}
_state = State.STOPPED;
return STOP;
}

@Override
Expand Down Expand Up @@ -421,6 +412,7 @@ public void run()
{
if (LOG.isDebugEnabled())
LOG.debug("{} exited {}", this, ReservedThreadExecutor.this);
_threads.remove(this);
_thread = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ public void run()
while (!reserved.tryExecute(() -> {}))
Thread.yield();

System.err.println(reserved.dump());

reserved.stop();
pool.stop();

Expand Down

0 comments on commit 69d7add

Please sign in to comment.