diff --git a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java index cdf932f..fca5a5d 100644 --- a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java +++ b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java @@ -41,6 +41,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -278,6 +279,21 @@ public final class EnhancedQueueExecutor extends AbstractExecutorService impleme */ volatile Runnable terminationTask; + /** + * If the thread pool has reached its max and generated a thread dump. + */ + private AtomicBoolean reachedMax = new AtomicBoolean(); + + /** + * If a WARN is allowed upon thread exhaustion. + */ + private boolean allowExhaustionWarn; + + /** + * If a dump is allowed upon thread exhaustion. + */ + private boolean allowExhaustionDump; + // ======================================================= // Statistics fields and counters // ======================================================= @@ -379,6 +395,8 @@ public final class EnhancedQueueExecutor extends AbstractExecutorService impleme head = tail = new TaskNode(null); // thread stat threadStatus = withCoreSize(withMaxSize(withAllowCoreTimeout(0L, builder.allowsCoreThreadTimeOut()), maxSize), coreSize); + withAllowExhaustionWarn(builder.allowsExhaustionWarn()); + withAllowExhaustionDump(builder.allowsExhaustionDump()); timeoutNanos = max(1L, keepAliveTime); queueSize = withMaxQueueSize(withCurrentQueueSize(0L, 0), builder.getMaximumQueueSize()); mxBean = new MXBeanImpl(); @@ -421,6 +439,8 @@ public static final class Builder { private TimeUnit keepAliveUnits = TimeUnit.SECONDS; private float growthResistance; private boolean allowCoreTimeOut; + private boolean allowExhaustionWarn; + private boolean allowExhaustionDump; private int maxQueueSize = Integer.MAX_VALUE; private boolean registerMBean = REGISTER_MBEAN; private String mBeanName; @@ -607,6 +627,50 @@ public Builder allowCoreThreadTimeOut(final boolean allowCoreTimeOut) { return this; } + /** + * Determine whether a WARN logged upon thread exhaustion is allowed. + * + * @return {@code true} if a WARN upon thread exhaustion is allowed, {@code false} otherwise + * @see EnhancedQueueExecutor#allowsExhaustionWarn() + */ + public boolean allowsExhaustionWarn() { + return allowExhaustionWarn; + } + + /** + * Establish whether a WARN logged upon thread exhaustion is allowed. + * + * @param allowExhaustionWarn {@code true} if a WARN upon thread exhaustion is allowed, {@code false} otherwise + * @return this builder + * @see EnhancedQueueExecutor#allowExhaustionWarn(boolean) + */ + public Builder allowExhaustionWarn(final boolean allowExhaustionWarn) { + this.allowExhaustionWarn = allowExhaustionWarn; + return this; + } + + /** + * Determine whether a thread dump logged upon thread exhaustion is allowed. + * + * @return {@code true} if a thread dump upon thread exhaustion is allowed, {@code false} otherwise + * @see EnhancedQueueExecutor#allowsExhaustionDump() + */ + public boolean allowsExhaustionDump() { + return allowExhaustionDump; + } + + /** + * Establish whether a thread dump logged upon thread exhaustion is allowed. + * + * @param allowExhaustionDump {@code true} if a thread dump upon thread exhaustion is allowed, {@code false} otherwise + * @return this builder + * @see EnhancedQueueExecutor#allowExhaustionDump(boolean) + */ + public Builder allowExhaustionDump(final boolean allowExhaustionDump) { + this.allowExhaustionDump = allowExhaustionDump; + return this; + } + /** * Get the maximum queue size. If the queue is full and a task cannot be immediately accepted, rejection will result. * @@ -1514,6 +1578,9 @@ int tryAllocateThread(final float growthResistance) { oldSize = currentSizeOf(oldStat); if (oldSize >= maxSizeOf(oldStat)) { // max threads already reached + if (reachedMax.compareAndSet(false, true) && isAllowExhaustionWarn()) { + ThreadDumpUtil.handleExhaustion(maxSizeOf(oldStat), isAllowExhaustionDump()); + } return AT_NO; } if (oldSize >= coreSizeOf(oldStat) && oldSize > 0) { @@ -1935,6 +2002,14 @@ static long withShutdownInterrupt(final long status) { return status | TS_SHUTDOWN_INTERRUPT; } + private boolean withAllowExhaustionWarn(final boolean allowed) { + return allowExhaustionWarn = allowed; + } + + private boolean withAllowExhaustionDump(final boolean allowed) { + return allowExhaustionDump = allowed; + } + static long withAllowCoreTimeout(final long status, final boolean allowed) { return allowed ? status | TS_ALLOW_CORE_TIMEOUT : status & ~TS_ALLOW_CORE_TIMEOUT; } @@ -1955,6 +2030,14 @@ static boolean isAllowCoreTimeout(final long oldVal) { return (oldVal & TS_ALLOW_CORE_TIMEOUT) != 0; } + private boolean isAllowExhaustionWarn() { + return allowExhaustionWarn; + } + + private boolean isAllowExhaustionDump() { + return allowExhaustionDump; + } + // ======================================================= // Static configuration // ======================================================= diff --git a/src/main/java/org/jboss/threads/Messages.java b/src/main/java/org/jboss/threads/Messages.java index 14b8555..096728a 100644 --- a/src/main/java/org/jboss/threads/Messages.java +++ b/src/main/java/org/jboss/threads/Messages.java @@ -97,6 +97,14 @@ interface Messages extends BasicLogger { @LogMessage(level = Logger.Level.ERROR) void taskSubmitFailed(@Cause RejectedExecutionException e, Runnable task); + @Message(id = 15, value = "%s") + @LogMessage(level = Logger.Level.WARN) + void exhaustedPoolMessage(String msg); + + @Message(id = 16, value = "Exception thrown during generation of thread dump") + @LogMessage(level = Logger.Level.WARN) + void threadDumpException(@Cause Exception cause); + // validation @Message(id = 100, value = "Keep-alive may only be set to 0 for this executor type") diff --git a/src/main/java/org/jboss/threads/ThreadDumpUtil.java b/src/main/java/org/jboss/threads/ThreadDumpUtil.java new file mode 100644 index 0000000..4dc6b5f --- /dev/null +++ b/src/main/java/org/jboss/threads/ThreadDumpUtil.java @@ -0,0 +1,131 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.jboss.threads; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.management.LockInfo; +import java.lang.management.ManagementFactory; +import java.lang.management.MonitorInfo; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.lang.StringBuilder; + +final class ThreadDumpUtil { + + public static void handleExhaustion(int maxPoolSize, boolean allowDump) { + StringBuilder msg = new StringBuilder(); + msg.append(String.format("Thread pool has reached %d max threads in use. Performance may be impacted.", maxPoolSize)); + if (allowDump) { + threadDump(msg); + } + Messages.msg.exhaustedPoolMessage(msg.toString()); + } + + public static void threadDump(StringBuilder sb) { + try { + sb.append(" Thread dump:\n" + + "*******************************************************************************\n"); + ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + for (ThreadInfo threadInfo : threadMXBean.dumpAllThreads(true, true)) { + sb.append(threadInfoToString(threadInfo)); + } + sb.append("\n===============================================================================\n" + + "End Thread dump\n*******************************************************************************\n"); + + long[] deadlockedThreads = threadMXBean.findDeadlockedThreads(); + if (deadlockedThreads != null && deadlockedThreads.length > 0) { + sb.append("Deadlock detected!\n"+ + "*******************************************************************************\n" + + "{0}\n===============================================================================\n" + + "End Deadlock\n*******************************************************************************\n"); + for (ThreadInfo threadInfo : threadMXBean.getThreadInfo(deadlockedThreads, true, true)) { + sb.append(threadInfoToString(threadInfo)); + } + } + } catch (Exception e) { + Messages.msg.threadDumpException(e); + } + } + + private static String threadInfoToString(ThreadInfo threadInfo) { + StringBuilder sb = new StringBuilder("\"" + threadInfo.getThreadName() + "\"" + + " Id=" + threadInfo.getThreadId() + " " + + threadInfo.getThreadState()); + if (threadInfo.getLockName() != null) { + sb.append(" on " + threadInfo.getLockName()); + } + if (threadInfo.getLockOwnerName() != null) { + sb.append(" owned by \"" + threadInfo.getLockOwnerName() + + "\" Id=" + threadInfo.getLockOwnerId()); + } + if (threadInfo.isSuspended()) { + sb.append(" (suspended)"); + } + if (threadInfo.isInNative()) { + sb.append(" (in native)"); + } + sb.append('\n'); + int i = 0; + for (; i < threadInfo.getStackTrace().length; i++) { + StackTraceElement ste = threadInfo.getStackTrace()[i]; + sb.append("\tat " + ste.toString()); + sb.append('\n'); + if (i == 0 && threadInfo.getLockInfo() != null) { + Thread.State ts = threadInfo.getThreadState(); + switch (ts) { + case BLOCKED: + sb.append("\t- blocked on " + threadInfo.getLockInfo()); + sb.append('\n'); + break; + case WAITING: + sb.append("\t- waiting on " + threadInfo.getLockInfo()); + sb.append('\n'); + break; + case TIMED_WAITING: + sb.append("\t- waiting on " + threadInfo.getLockInfo()); + sb.append('\n'); + break; + default: + } + } + + for (MonitorInfo mi : threadInfo.getLockedMonitors()) { + if (mi.getLockedStackDepth() == i) { + sb.append("\t- locked " + mi); + sb.append('\n'); + } + } + } + + LockInfo[] locks = threadInfo.getLockedSynchronizers(); + if (locks.length > 0) { + sb.append("\n\tNumber of locked synchronizers = " + locks.length); + sb.append('\n'); + for (LockInfo li : locks) { + sb.append("\t- " + li); + sb.append('\n'); + } + } + sb.append('\n'); + return sb.toString(); + } + +}