Skip to content
This repository has been archived by the owner on Feb 20, 2023. It is now read-only.

Commit

Permalink
Limit queue of thread pools
Browse files Browse the repository at this point in the history
Prevents the queues to grow indefinitely.
See stagemonitor/stagemonitor-mailinglist#16
  • Loading branch information
Felix Barnsteiner committed Oct 1, 2015
1 parent d318e97 commit 76308e1
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class CorePlugin extends StagemonitorPlugin {
public static final String DEFAULT_APPLICATION_NAME = "My Application";

private static final String CORE_PLUGIN_NAME = "Core";
public static final String POOLS_QUEUE_CAPACITY_LIMIT_KEY = "stagemonitor.threadPools.queueCapacityLimit";

private final Logger logger = LoggerFactory.getLogger(getClass());

Expand Down Expand Up @@ -310,6 +311,18 @@ public class CorePlugin extends StagemonitorPlugin {
.defaultValue(Collections.<String>emptySet())
.configurationCategory(CORE_PLUGIN_NAME)
.build();
private final ConfigurationOption<Integer> threadPoolQueueCapacityLimit = ConfigurationOption.integerOption()
.key(POOLS_QUEUE_CAPACITY_LIMIT_KEY)
.dynamic(false)
.label("Thread Pool Queue Capacity Limit")
.description("Sets a limit to the number of pending tasks in the ExecutorServices stagemonitor uses. " +
"These are thread pools that are used for example to report request traces to elasticsearch. " +
"If elasticsearch is unreachable or your application encounters a spike in incoming requests this limit could be reached. " +
"It is used to prevent the queue from growing indefinitely. ")
.defaultValue(1000)
.configurationCategory(CORE_PLUGIN_NAME)
.tags("advanced")
.build();

private static MetricsAggregationReporter aggregationReporter;

Expand Down Expand Up @@ -518,4 +531,8 @@ public boolean isAttachAgentAtRuntime() {
public Collection<String> getExcludedInstrumenters() {
return excludedInstrumenters.getValue();
}

public int getThreadPoolQueueCapacityLimit() {
return threadPoolQueueCapacityLimit.getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.jar.Attributes;
import java.util.jar.Manifest;

Expand All @@ -28,6 +26,7 @@
import org.stagemonitor.core.Stagemonitor;
import org.stagemonitor.core.pool.JavaThreadPoolMetricsCollectorImpl;
import org.stagemonitor.core.pool.PooledResourceMetricsRegisterer;
import org.stagemonitor.core.util.ExecutorUtils;
import org.stagemonitor.core.util.HttpClient;
import org.stagemonitor.core.util.JsonUtils;
import org.stagemonitor.core.util.StringUtils;
Expand All @@ -40,24 +39,16 @@ public class ElasticsearchClient {
private final HttpClient httpClient;
private final CorePlugin corePlugin;

public final ThreadPoolExecutor asyncRestPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("async-elasticsearch");
return thread;
}
});

private final ThreadPoolExecutor asyncRestPool;

public ElasticsearchClient() {
this(Stagemonitor.getConfiguration().getConfig(CorePlugin.class));
}

public ElasticsearchClient(CorePlugin corePlugin) {
this.corePlugin = corePlugin;
asyncRestPool = ExecutorUtils
.createSingleThreadDeamonPool("async-elasticsearch", corePlugin.getThreadPoolQueueCapacityLimit());
if (corePlugin.isInternalMonitoringActive()) {
JavaThreadPoolMetricsCollectorImpl pooledResource = new JavaThreadPoolMetricsCollectorImpl(asyncRestPool, "internal.asyncRestPool");
PooledResourceMetricsRegisterer.registerPooledResource(pooledResource, Stagemonitor.getMetricRegistry());
Expand Down Expand Up @@ -107,12 +98,16 @@ public int sendAsJson(final String method, final String path, final Object reque

public Future<?> sendAsJsonAsync(final String method, final String path, final Object requestBody) {
if (StringUtils.isNotEmpty(corePlugin.getElasticsearchUrl())) {
return asyncRestPool.submit(new Runnable() {
@Override
public void run() {
sendAsJson(method, path, requestBody);
}
});
try {
return asyncRestPool.submit(new Runnable() {
@Override
public void run() {
sendAsJson(method, path, requestBody);
}
});
} catch (RejectedExecutionException e) {
ExecutorUtils.logRejectionWarning(e);
}
}
return new CompletedFuture<Object>(null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.stagemonitor.core.util;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.stagemonitor.core.CorePlugin;

public final class ExecutorUtils {

private static final Logger logger = LoggerFactory.getLogger(ExecutorUtils.class);

private ExecutorUtils() {
// don't instantiate
}

public static ThreadPoolExecutor createSingleThreadDeamonPool(final String threadName, int queueCapacity) {
final ThreadFactory daemonThreadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName(threadName);
return thread;
}
};
return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueCapacity), daemonThreadFactory) {
@Override
public String toString() {
return super.toString() + "(thread name = " + threadName + ")";
}
};
}

public static void logRejectionWarning(RejectedExecutionException e) {
logger.warn("The limit of pending tasks for the executor is reached. " +
"This could be due to a unreachable service such as elasticsearch or due to a spike in incoming requests. " +
"Consider increasing the default capacity limit with the configuration key '" + CorePlugin.POOLS_QUEUE_CAPACITY_LIMIT_KEY + "'\n"
+ e.getMessage());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.stagemonitor.core.util;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

import org.junit.Test;

public class ExecutorUtilsTest {

final ThreadPoolExecutor lowCapacityPool = ExecutorUtils.createSingleThreadDeamonPool("test-pool", 1);
private Runnable sleepABit = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};

@Test(expected = RejectedExecutionException.class)
public void testRejectedExecution() throws Exception {
for (int i = 0; i < 10; i++) {
lowCapacityPool.submit(sleepABit);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -28,6 +26,7 @@
import org.stagemonitor.core.Stagemonitor;
import org.stagemonitor.core.configuration.Configuration;
import org.stagemonitor.core.util.GraphiteSanitizer;
import org.stagemonitor.core.util.ExecutorUtils;
import org.stagemonitor.requestmonitor.profiler.CallStackElement;
import org.stagemonitor.requestmonitor.profiler.Profiler;

Expand All @@ -54,15 +53,7 @@ public class RequestMonitor {

private final List<Runnable> onAfterRequestCallbacks = new LinkedList<Runnable>();

private static ExecutorService asyncRequestTraceReporterPool = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("async-request-reporter");
return thread;
}
});
private ExecutorService asyncRequestTraceReporterPool;

private int warmupRequests = 0;
private AtomicBoolean warmedUp = new AtomicBoolean(false);
Expand Down Expand Up @@ -93,6 +84,8 @@ public RequestMonitor(CorePlugin corePlugin, MetricRegistry registry, RequestMon
this.requestMonitorPlugin = requestMonitorPlugin;
warmupRequests = requestMonitorPlugin.getNoOfWarmupRequests();
endOfWarmup = new Date(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(requestMonitorPlugin.getWarmupSeconds()));
asyncRequestTraceReporterPool = ExecutorUtils
.createSingleThreadDeamonPool("async-request-reporter", corePlugin.getThreadPoolQueueCapacityLimit());
}

public <T extends RequestTrace> void monitorStart(MonitoredRequest<T> monitoredRequest) {
Expand Down Expand Up @@ -322,7 +315,7 @@ public void run() {
}
});
} catch (RejectedExecutionException e) {
logger.warn(e.getMessage() + " (this exception is ignored)", e);
ExecutorUtils.logRejectionWarning(e);
}
}

Expand Down

0 comments on commit 76308e1

Please sign in to comment.