Skip to content

Commit

Permalink
Borrow WC variants of executor views to reduce thread count and incre…
Browse files Browse the repository at this point in the history
…ase reuse (#4877)

* Borrow WC variants of executor views to reduce thread count and increase reuse

Note that this includes an API+ABI break for fixed-size-executor factory
methods which have been updated to return ExecutorService instead
of ThreadPoolExecutor.

* changelog
  • Loading branch information
carterkozak committed Jul 2, 2020
1 parent 9a6604f commit abedbf0
Show file tree
Hide file tree
Showing 14 changed files with 940 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.MetricRegistry;
Expand All @@ -33,7 +31,6 @@
import com.palantir.atlasdb.keyvalue.cassandra.async.client.creation.CqlClientFactory;
import com.palantir.atlasdb.keyvalue.cassandra.async.client.creation.DefaultCqlClientFactory;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.common.concurrent.NamedThreadFactory;
import com.palantir.common.concurrent.PTExecutors;
import com.palantir.tracing.Tracers;

Expand Down Expand Up @@ -88,10 +85,7 @@ public ExecutorService visit(CqlCapableConfig cqlCapableConfig) {
* @return a new dynamic thread pool with a thread keep alive time of 1 minute
*/
private static ExecutorService createThreadPool(int maxPoolSize) {
LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
NamedThreadFactory threadFactory = new NamedThreadFactory("Atlas Cassandra Async KVS", false);

return PTExecutors.newThreadPoolExecutor(0, maxPoolSize, 1, TimeUnit.MINUTES, workQueue, threadFactory);
return PTExecutors.newFixedThreadPool(maxPoolSize, "Atlas Cassandra Async KVS");
}

private ExecutorService tracingExecutorService(ExecutorService executorService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -89,10 +88,8 @@ protected static ExecutorService createFixedThreadPool(String threadNamePrefix,
* @param maxPoolSize maximum size of the pool
* @return a new fixed size thread pool with a keep alive time of 1 minute
*/
protected static ExecutorService createThreadPool(String threadNamePrefix, int corePoolSize, int maxPoolSize) {
return PTExecutors.newThreadPoolExecutor(corePoolSize, maxPoolSize,
1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(), new NamedThreadFactory(threadNamePrefix, false));
protected static ExecutorService createThreadPool(String threadNamePrefix, int _corePoolSize, int maxPoolSize) {
return PTExecutors.newFixedThreadPool(maxPoolSize, threadNamePrefix);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,12 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.immutables.value.Value;

import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
Expand All @@ -49,7 +44,6 @@
import com.palantir.atlasdb.http.NotCurrentLeaderExceptionMapper;
import com.palantir.atlasdb.util.AtlasDbMetrics;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.common.concurrent.NamedThreadFactory;
import com.palantir.common.concurrent.PTExecutors;
import com.palantir.common.streams.KeyedStream;
import com.palantir.conjure.java.api.config.service.UserAgent;
Expand All @@ -59,7 +53,6 @@
import com.palantir.leader.LeaderElectionServiceBuilder;
import com.palantir.leader.LeadershipObserver;
import com.palantir.leader.LocalPingableLeader;
import com.palantir.leader.PaxosLeaderElectionService;
import com.palantir.leader.PaxosLeadershipEventRecorder;
import com.palantir.leader.PingableLeader;
import com.palantir.paxos.ImmutableLeaderPingerContext;
Expand Down Expand Up @@ -239,20 +232,7 @@ private static <T> Map<T, ExecutorService> createExecutorsForService(
// TODO (jkong): Make the limits configurable.
// Current use cases tend to have not more than 10 (<< 100) inflight tasks under normal circumstances.
private static ExecutorService createExecutor(MetricsManager metricsManager, String useCase, int corePoolSize) {
return new InstrumentedExecutorService(
PTExecutors.newThreadPoolExecutor(
corePoolSize,
100,
5000,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
daemonThreadFactory("atlas-leaders-election-" + useCase)),
metricsManager.getRegistry(),
MetricRegistry.name(PaxosLeaderElectionService.class, useCase, "executor"));
}

private static ThreadFactory daemonThreadFactory(String name) {
return new NamedThreadFactory(name, true);
return PTExecutors.newCachedThreadPoolWithMaxThreads(100, "atlas-leaders-election-" + useCase);
}

public static <T> List<T> createProxyAndLocalList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand Down Expand Up @@ -121,7 +119,6 @@
import com.palantir.common.base.ClosableIterators;
import com.palantir.common.base.Throwables;
import com.palantir.common.collect.Maps2;
import com.palantir.common.concurrent.NamedThreadFactory;
import com.palantir.common.concurrent.PTExecutors;
import com.palantir.exception.PalantirSqlException;
import com.palantir.logsafe.Preconditions;
Expand Down Expand Up @@ -248,14 +245,8 @@ private DbKvs(ExecutorService executor,
this.getCandidateCellsForSweepingStrategy = getCandidateCellsForSweepingStrategy;
}

private static ThreadPoolExecutor newFixedThreadPool(int maxPoolSize) {
ThreadPoolExecutor pool = PTExecutors.newThreadPoolExecutor(maxPoolSize, maxPoolSize,
15L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("Atlas DbKvs reader", true /* daemon */));

pool.allowCoreThreadTimeOut(false);
return pool;
private static ExecutorService newFixedThreadPool(int maxPoolSize) {
return PTExecutors.newFixedThreadPool(maxPoolSize, "Atlas DbKvs reader");
}

private void init() {
Expand Down
14 changes: 14 additions & 0 deletions changelog/@unreleased/pr-4877.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
changes:
- type: improvement
improvement:
description: |-
PTExecutors simple cached and fixed executor factories (those which don't consume a ThreadFactory) use views
over a shared executor service to reduce total thread count and promote resource reuse.
links:
- https://github.com/palantir/atlasdb/pull/4877
- type: break
break:
description: |-
`PTExecutors.newFixedThreadPool` overloads return `ExecutorService` instead of the concrete `ThreadPoolExecutor` type.
links:
- https://github.com/palantir/atlasdb/pull/4877
Original file line number Diff line number Diff line change
Expand Up @@ -360,15 +360,12 @@ public static boolean assertNotOnSqlThread() {

private static final String SELECT_THREAD_NAME = "SQL select statement"; //$NON-NLS-1$
private static final String EXECUTE_THREAD_NAME = "SQL execute statement"; //$NON-NLS-1$
private static final int KEEP_SQL_THREAD_ALIVE_TIMEOUT = 3000; //3 seconds

// TODO (jkong): Should these be lazily initialized?
private static final Supplier<ExecutorService> DEFAULT_SELECT_EXECUTOR =
Suppliers.memoize(() -> PTExecutors.newCachedThreadPool(
new NamedThreadFactory(SELECT_THREAD_NAME, true), KEEP_SQL_THREAD_ALIVE_TIMEOUT));
Suppliers.memoize(() -> PTExecutors.newCachedThreadPool(SELECT_THREAD_NAME));
static final Supplier<ExecutorService> DEFAULT_EXECUTE_EXECUTOR =
Suppliers.memoize(() -> PTExecutors.newCachedThreadPool(
new NamedThreadFactory(EXECUTE_THREAD_NAME, true), KEEP_SQL_THREAD_ALIVE_TIMEOUT));
Suppliers.memoize(() -> PTExecutors.newCachedThreadPool(EXECUTE_THREAD_NAME));

private ExecutorService selectStatementExecutor;
private ExecutorService executeStatementExecutor;
Expand Down
Loading

0 comments on commit abedbf0

Please sign in to comment.