Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jetty 9.4.x 3550 queued thread pool stalled #3586

Merged
merged 5 commits into from
Apr 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicLong;

import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
Expand Down Expand Up @@ -135,7 +136,7 @@ public void setThreadPoolBudget(ThreadPoolBudget budget)
@Override
protected void doStart() throws Exception
{
_tryExecutor = new ReservedThreadExecutor(this,_reservedThreads);
_tryExecutor = _reservedThreads==0 ? NO_TRY : new ReservedThreadExecutor(this,_reservedThreads);
addBean(_tryExecutor);

super.doStart();
Expand Down Expand Up @@ -473,7 +474,7 @@ public void execute(Runnable job)
else
{
// Make sure there is at least one thread executing the job.
if (getThreads() == 0)
if (getQueueSize() > 0 && getIdleThreads() == 0)
startThreads(1);
}
}
Expand Down Expand Up @@ -603,7 +604,7 @@ public void dump(Appendable out, String indent) throws IOException
String knownMethod = "";
for (StackTraceElement t : trace)
{
if ("idleJobPoll".equals(t.getMethodName()) && t.getClassName().endsWith("QueuedThreadPool"))
if ("idleJobPoll".equals(t.getMethodName()) && t.getClassName().equals(Runner.class.getName()))
{
knownMethod = "IDLE ";
break;
Expand Down Expand Up @@ -636,11 +637,10 @@ public void dump(Appendable out, String indent) throws IOException
@Override
public void dump(Appendable out, String indent) throws IOException
{
String s = thread.getId()+" "+thread.getName()+" "+thread.getState()+" "+thread.getPriority();
if (known.length()==0)
Dumpable.dumpObjects(out, indent, s, (Object[])trace);
if (StringUtil.isBlank(known))
Dumpable.dumpObjects(out, indent, String.format("%s %s %s %d", thread.getId(), thread.getName(), thread.getState(), thread.getPriority()), (Object[])trace);
else
Dumpable.dumpObjects(out, indent, s);
Dumpable.dumpObjects(out, indent, String.format("%s %s %s %s %d", thread.getId(), thread.getName(), known, thread.getState(), thread.getPriority()));
}

@Override
Expand Down Expand Up @@ -671,7 +671,7 @@ public String dump()
@Override
public String toString()
{
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,q=%d}[%s]",
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,q=%d}[%s]",
getClass().getSimpleName(),
_name,
hashCode(),
Expand All @@ -680,11 +680,84 @@ public String toString()
getThreads(),
getMaxThreads(),
getIdleThreads(),
getReservedThreads(),
_jobs.size(),
_tryExecutor);
}

private Runnable _runnable = new Runnable()
private final Runnable _runnable = new Runner();

/**
* <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
* <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
*
* @param job the job to run
*/
protected void runJob(Runnable job)
{
job.run();
}

/**
* @return the job queue
*/
protected BlockingQueue<Runnable> getQueue()
{
return _jobs;
}

/**
* @param queue the job queue
* @deprecated pass the queue to the constructor instead
*/
@Deprecated
public void setQueue(BlockingQueue<Runnable> queue)
{
throw new UnsupportedOperationException("Use constructor injection");
}

/**
* @param id the thread ID to interrupt.
* @return true if the thread was found and interrupted.
*/
@ManagedOperation("interrupts a pool thread")
public boolean interruptThread(@Name("id") long id)
{
for (Thread thread : _threads)
{
if (thread.getId() == id)
{
thread.interrupt();
return true;
}
}
return false;
}

/**
* @param id the thread ID to interrupt.
* @return the stack frames dump
*/
@ManagedOperation("dumps a pool thread stack")
public String dumpThread(@Name("id") long id)
{
for (Thread thread : _threads)
{
if (thread.getId() == id)
{
StringBuilder buf = new StringBuilder();
buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ");
buf.append(thread.getState()).append(":").append(System.lineSeparator());
for (StackTraceElement element : thread.getStackTrace())
buf.append(" at ").append(element.toString()).append(System.lineSeparator());
return buf.toString();
}
}
return null;
}

private static Runnable SHRINK = ()->{};
private class Runner implements Runnable
{
@Override
public void run()
Expand All @@ -707,24 +780,12 @@ public void run()
_threadsIdle.incrementAndGet();
}

if (_idleTimeout <= 0)
job = _jobs.take();
else
job = idleJobPoll();
if (job == SHRINK)
{
// maybe we should shrink?
int size = _threadsStarted.get();
if (size > _minThreads)
{
long last = _lastShrink.get();
long now = System.nanoTime();
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
{
if (_lastShrink.compareAndSet(last, now))
break;
}
}

job = _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
if (LOG.isDebugEnabled())
LOG.debug("shrinking {}", this);
break;
}
}

Expand Down Expand Up @@ -769,78 +830,32 @@ public void run()

removeThread(Thread.currentThread());

if (_threadsStarted.decrementAndGet() < getMinThreads())
int threads = _threadsStarted.decrementAndGet();
// We should start a new thread if threads are now less than min threads or we have queued jobs
if (threads < getMinThreads() || getQueueSize()>0)
startThreads(1);
}
}
};

/**
* <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
* <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
*
* @param job the job to run
*/
protected void runJob(Runnable job)
{
job.run();
}

/**
* @return the job queue
*/
protected BlockingQueue<Runnable> getQueue()
{
return _jobs;
}

/**
* @param queue the job queue
* @deprecated pass the queue to the constructor instead
*/
@Deprecated
public void setQueue(BlockingQueue<Runnable> queue)
{
throw new UnsupportedOperationException("Use constructor injection");
}

/**
* @param id the thread ID to interrupt.
* @return true if the thread was found and interrupted.
*/
@ManagedOperation("interrupts a pool thread")
public boolean interruptThread(@Name("id") long id)
{
for (Thread thread : _threads)
private Runnable idleJobPoll() throws InterruptedException
{
if (thread.getId() == id)
{
thread.interrupt();
return true;
}
}
return false;
}
if (_idleTimeout <= 0)
return _jobs.take();

/**
* @param id the thread ID to interrupt.
* @return the stack frames dump
*/
@ManagedOperation("dumps a pool thread stack")
public String dumpThread(@Name("id") long id)
{
for (Thread thread : _threads)
{
if (thread.getId() == id)
// maybe we should shrink?
int size = _threadsStarted.get();
if (size > _minThreads)
{
StringBuilder buf = new StringBuilder();
buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ");
buf.append(thread.getState()).append(":").append(System.lineSeparator());
for (StackTraceElement element : thread.getStackTrace())
buf.append(" at ").append(element.toString()).append(System.lineSeparator());
return buf.toString();
long last = _lastShrink.get();
long now = System.nanoTime();
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
{
if (_lastShrink.compareAndSet(last, now))
return SHRINK;
}
}

return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,19 @@ public String toString()
}
}

public static final TryExecutor NO_TRY = task -> false;
TryExecutor NO_TRY = new TryExecutor()
{
@Override
public boolean tryExecute(Runnable task)
{
return false;
}

@Override
public String toString()
{
return "NO_TRY";
}
};

}