Skip to content

Commit

Permalink
Replace BoundedExecutor with BoundedThreadPool
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed May 1, 2016
1 parent 2bc3ee5 commit c9ff7c6
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 29 deletions.
Expand Up @@ -32,7 +32,6 @@
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.google.common.collect.Multimap;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.concurrent.ThreadPoolExecutorMBean;
import io.airlift.http.client.HttpClient;
import io.airlift.json.JsonCodec;
Expand All @@ -43,14 +42,13 @@
import javax.annotation.PreDestroy;
import javax.inject.Inject;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

import static io.airlift.concurrent.BoundedThreadPool.newBoundedThreadPool;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;

public class HttpRemoteTaskFactory
Expand All @@ -64,8 +62,7 @@ public class HttpRemoteTaskFactory
private final Duration minErrorDuration;
private final Duration taskStatusRefreshMaxWait;
private final Duration taskInfoUpdateInterval;
private final ExecutorService coreExecutor;
private final Executor executor;
private final ExecutorService executor;
private final ThreadPoolExecutorMBean executorMBean;
private final ScheduledExecutorService updateScheduledExecutor;
private final ScheduledExecutorService errorScheduledExecutor;
Expand All @@ -89,9 +86,8 @@ public HttpRemoteTaskFactory(QueryManagerConfig config,
this.minErrorDuration = config.getRemoteTaskMinErrorDuration();
this.taskStatusRefreshMaxWait = taskConfig.getStatusRefreshMaxWait();
this.taskInfoUpdateInterval = taskConfig.getInfoUpdateInterval();
this.coreExecutor = newCachedThreadPool(daemonThreadsNamed("remote-task-callback-%s"));
this.executor = new BoundedExecutor(coreExecutor, config.getRemoteTaskMaxCallbackThreads());
this.executorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) coreExecutor);
this.executor = newBoundedThreadPool(config.getRemoteTaskMaxCallbackThreads(), daemonThreadsNamed("remote-task-callback-%s"));
this.executorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) executor);
this.stats = requireNonNull(stats, "stats is null");

this.updateScheduledExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("task-info-update-scheduler-%s"));
Expand All @@ -108,7 +104,7 @@ public ThreadPoolExecutorMBean getExecutor()
@PreDestroy
public void stop()
{
coreExecutor.shutdownNow();
executor.shutdownNow();
updateScheduledExecutor.shutdownNow();
errorScheduledExecutor.shutdownNow();
}
Expand Down
Expand Up @@ -102,7 +102,6 @@
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.airlift.discovery.client.ServiceDescriptor;
import io.airlift.node.NodeInfo;
Expand All @@ -120,6 +119,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.nullToEmpty;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.concurrent.BoundedThreadPool.newBoundedThreadPool;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.discovery.client.DiscoveryBinder.discoveryBinder;
Expand Down Expand Up @@ -379,17 +379,9 @@ public ScheduledExecutorService createExchangeExecutor(ExchangeClientConfig conf
@Provides
@Singleton
@ForAsyncHttp
public static ExecutorService createAsyncHttpResponseCoreExecutor()
public static ExecutorService createAsyncHttpResponseCoreExecutor(TaskManagerConfig config)
{
return newCachedThreadPool(daemonThreadsNamed("async-http-response-%s"));
}

@Provides
@Singleton
@ForAsyncHttp
public static BoundedExecutor createAsyncHttpResponseExecutor(@ForAsyncHttp ExecutorService coreExecutor, TaskManagerConfig config)
{
return new BoundedExecutor(coreExecutor, config.getHttpResponseThreads());
return newBoundedThreadPool(config.getHttpResponseThreads(), daemonThreadsNamed("async-http-response-%s"));
}

@Provides
Expand Down
Expand Up @@ -24,7 +24,6 @@
import com.facebook.presto.spi.Page;
import com.google.common.collect.ImmutableList;
import com.google.common.reflect.TypeToken;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.stats.TimeStat;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
Expand Down Expand Up @@ -55,6 +54,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;

Expand Down Expand Up @@ -91,7 +91,7 @@ public class TaskResource
@Inject
public TaskResource(TaskManager taskManager,
SessionPropertyManager sessionPropertyManager,
@ForAsyncHttp BoundedExecutor responseExecutor,
@ForAsyncHttp ExecutorService responseExecutor,
@ForAsyncHttp ScheduledExecutorService timeoutExecutor)
{
this.taskManager = requireNonNull(taskManager, "taskManager is null");
Expand Down
Expand Up @@ -17,8 +17,6 @@
import com.google.common.util.concurrent.SimpleTimeLimiter;
import com.google.common.util.concurrent.TimeLimiter;
import com.google.common.util.concurrent.UncheckedTimeoutException;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.concurrent.ExecutorServiceAdapter;
import io.airlift.units.Duration;

import javax.annotation.PreDestroy;
Expand All @@ -28,9 +26,9 @@
import java.util.concurrent.ExecutorService;

import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_BACKUP_TIMEOUT;
import static io.airlift.concurrent.BoundedThreadPool.newBoundedThreadPool;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class TimeoutBackupStore
Expand All @@ -45,8 +43,8 @@ public TimeoutBackupStore(BackupStore store, String connectorId, Duration timeou
requireNonNull(connectorId, "connectorId is null");
requireNonNull(timeout, "timeout is null");

this.executor = newCachedThreadPool(daemonThreadsNamed("backup-proxy-" + connectorId + "-%s"));
this.store = timeLimited(store, BackupStore.class, timeout, executor, maxThreads);
this.executor = newBoundedThreadPool(maxThreads, daemonThreadsNamed("backup-proxy-" + connectorId + "-%s"));
this.store = timeLimited(store, BackupStore.class, timeout, executor);
}

@PreDestroy
Expand Down Expand Up @@ -99,9 +97,8 @@ public boolean shardExists(UUID uuid)
}
}

private static <T> T timeLimited(T target, Class<T> clazz, Duration timeout, ExecutorService executor, int maxThreads)
private static <T> T timeLimited(T target, Class<T> clazz, Duration timeout, ExecutorService executor)
{
executor = new ExecutorServiceAdapter(new BoundedExecutor(executor, maxThreads));
TimeLimiter limiter = new SimpleTimeLimiter(executor);
return limiter.newProxy(target, clazz, timeout.toMillis(), MILLISECONDS);
}
Expand Down

0 comments on commit c9ff7c6

Please sign in to comment.