Skip to content

Commit

Permalink
Remote: Async upload (Part 9)
Browse files Browse the repository at this point in the history
Update remote module to share a thread pool (with max size equals to --jobs) for gRPC and background uploads.

Part of bazelbuild#13655.

Closes bazelbuild#13655.

PiperOrigin-RevId: 395157789
  • Loading branch information
coeuvre authored and Copybara-Service committed Sep 7, 2021
1 parent 9cb5936 commit 581c81a
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

Expand All @@ -53,6 +54,7 @@ public final class GoogleAuthUtils {
* @throws IOException in case the channel can't be constructed.
*/
public static ManagedChannel newChannel(
@Nullable Executor executor,
String target,
String proxy,
AuthAndTLSOptions options,
Expand All @@ -71,6 +73,7 @@ public static ManagedChannel newChannel(
try {
NettyChannelBuilder builder =
newNettyChannelBuilder(targetUrl, proxy)
.executor(executor)
.negotiationType(
isTlsEnabled(target) ? NegotiationType.TLS : NegotiationType.PLAINTEXT);
if (options.grpcKeepaliveTime != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ static Metadata makeGrpcMetadata(BackendConfig config) {
@VisibleForTesting
protected ManagedChannel newGrpcChannel(BackendConfig config) throws IOException {
return GoogleAuthUtils.newChannel(
/*executor=*/ null,
config.besBackend(),
config.besProxy(),
config.authAndTLSOptions(),
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ java_library(
":ExecutionStatusException",
":ReferenceCountedChannel",
":Retrier",
"//src/main/java/com/google/devtools/build/lib:build-request-options",
"//src/main/java/com/google/devtools/build/lib:runtime",
"//src/main/java/com/google/devtools/build/lib/actions",
"//src/main/java/com/google/devtools/build/lib/actions:action_input_helper",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
Expand All @@ -32,30 +33,34 @@
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.runtime.CommandEnvironment;
import com.google.devtools.build.lib.vfs.Path;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;

/** Provides a remote execution context. */
final class RemoteActionContextProvider {

private final Executor executor;
private final CommandEnvironment env;
@Nullable private final RemoteCache cache;
@Nullable private final RemoteExecutionClient executor;
@Nullable private final RemoteCache remoteCache;
@Nullable private final RemoteExecutionClient remoteExecutor;
@Nullable private final ListeningScheduledExecutorService retryScheduler;
private final DigestUtil digestUtil;
@Nullable private final Path logDir;
private ImmutableSet<ActionInput> filesToDownload = ImmutableSet.of();
private RemoteExecutionService remoteExecutionService;

private RemoteActionContextProvider(
Executor executor,
CommandEnvironment env,
@Nullable RemoteCache cache,
@Nullable RemoteExecutionClient executor,
@Nullable RemoteCache remoteCache,
@Nullable RemoteExecutionClient remoteExecutor,
@Nullable ListeningScheduledExecutorService retryScheduler,
DigestUtil digestUtil,
@Nullable Path logDir) {
this.env = Preconditions.checkNotNull(env, "env");
this.cache = cache;
this.executor = executor;
this.env = Preconditions.checkNotNull(env, "env");
this.remoteCache = remoteCache;
this.remoteExecutor = remoteExecutor;
this.retryScheduler = retryScheduler;
this.digestUtil = digestUtil;
this.logDir = logDir;
Expand All @@ -66,27 +71,41 @@ public static RemoteActionContextProvider createForPlaceholder(
ListeningScheduledExecutorService retryScheduler,
DigestUtil digestUtil) {
return new RemoteActionContextProvider(
env, /*cache=*/ null, /*executor=*/ null, retryScheduler, digestUtil, /*logDir=*/ null);
directExecutor(),
env,
/*remoteCache=*/ null,
/*remoteExecutor=*/ null,
retryScheduler,
digestUtil,
/*logDir=*/ null);
}

public static RemoteActionContextProvider createForRemoteCaching(
Executor executor,
CommandEnvironment env,
RemoteCache cache,
RemoteCache remoteCache,
ListeningScheduledExecutorService retryScheduler,
DigestUtil digestUtil) {
return new RemoteActionContextProvider(
env, cache, /*executor=*/ null, retryScheduler, digestUtil, /*logDir=*/ null);
executor,
env,
remoteCache,
/*remoteExecutor=*/ null,
retryScheduler,
digestUtil,
/*logDir=*/ null);
}

public static RemoteActionContextProvider createForRemoteExecution(
Executor executor,
CommandEnvironment env,
RemoteExecutionCache cache,
RemoteExecutionClient executor,
RemoteExecutionCache remoteCache,
RemoteExecutionClient remoteExecutor,
ListeningScheduledExecutorService retryScheduler,
DigestUtil digestUtil,
Path logDir) {
return new RemoteActionContextProvider(
env, cache, executor, retryScheduler, digestUtil, logDir);
executor, env, remoteCache, remoteExecutor, retryScheduler, digestUtil, logDir);
}

private RemotePathResolver createRemotePathResolver() {
Expand Down Expand Up @@ -120,6 +139,7 @@ private RemoteExecutionService getRemoteExecutionService() {
checkNotNull(env.getOptions().getOptions(ExecutionOptions.class)).verboseFailures;
remoteExecutionService =
new RemoteExecutionService(
executor,
env.getReporter(),
verboseFailures,
env.getExecRoot(),
Expand All @@ -128,8 +148,8 @@ private RemoteExecutionService getRemoteExecutionService() {
env.getCommandId().toString(),
digestUtil,
checkNotNull(env.getOptions().getOptions(RemoteOptions.class)),
cache,
executor,
remoteCache,
remoteExecutor,
filesToDownload,
captureCorruptedOutputsDir);
env.getEventBus().register(remoteExecutionService);
Expand Down Expand Up @@ -179,11 +199,11 @@ public void registerSpawnCache(ModuleActionContextRegistry.Builder registryBuild

/** Returns the remote cache. */
RemoteCache getRemoteCache() {
return cache;
return remoteCache;
}

RemoteExecutionClient getRemoteExecutionClient() {
return executor;
return remoteExecutor;
}

void setFilesToDownload(ImmutableSet<ActionInput> topLevelOutputs) {
Expand All @@ -194,11 +214,11 @@ public void afterCommand() {
if (remoteExecutionService != null) {
remoteExecutionService.shutdown();
} else {
if (cache != null) {
cache.release();
if (remoteCache != null) {
remoteCache.release();
}
if (executor != null) {
executor.close();
if (remoteExecutor != null) {
remoteExecutor.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import com.google.protobuf.Message;
import io.grpc.Status.Code;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.disposables.Disposable;
Expand All @@ -132,6 +133,7 @@
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeSet;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

Expand All @@ -153,10 +155,13 @@ public class RemoteExecutionService {
private final ImmutableSet<PathFragment> filesToDownload;
@Nullable private final Path captureCorruptedOutputsDir;

private final Scheduler scheduler;

private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicBoolean buildInterrupted = new AtomicBoolean(false);

public RemoteExecutionService(
Executor executor,
Reporter reporter,
boolean verboseFailures,
Path execRoot,
Expand Down Expand Up @@ -185,6 +190,8 @@ public RemoteExecutionService(
}
this.filesToDownload = filesToDownloadBuilder.build();
this.captureCorruptedOutputsDir = captureCorruptedOutputsDir;

this.scheduler = Schedulers.from(executor, /*interruptibleWorker=*/ true);
}

static Command buildCommand(
Expand Down Expand Up @@ -1065,7 +1072,7 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult)
remoteCache ->
manifest.uploadAsync(action.getRemoteActionExecutionContext(), remoteCache),
RemoteCache::release)
.subscribeOn(Schedulers.io())
.subscribeOn(scheduler)
.subscribe(
new SingleObserver<ActionResult>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.google.devtools.build.lib.remote;

import static java.util.concurrent.TimeUnit.SECONDS;

import build.bazel.remote.execution.v2.DigestFunction;
import build.bazel.remote.execution.v2.ServerCapabilities;
import com.google.auth.Credentials;
Expand All @@ -27,6 +29,7 @@
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
import com.google.devtools.build.lib.actions.ActionGraph;
import com.google.devtools.build.lib.actions.ActionInput;
Expand All @@ -50,6 +53,7 @@
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
import com.google.devtools.build.lib.buildeventstream.LocalFilesArtifactUploader;
import com.google.devtools.build.lib.buildtool.BuildRequest;
import com.google.devtools.build.lib.buildtool.BuildRequestOptions;
import com.google.devtools.build.lib.collect.nestedset.NestedSet;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.Reporter;
Expand Down Expand Up @@ -102,7 +106,11 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;

/** RemoteModule provides distributed cache and remote execution for Bazel. */
public final class RemoteModule extends BlazeModule {
Expand All @@ -114,6 +122,8 @@ public final class RemoteModule extends BlazeModule {
private final ListeningScheduledExecutorService retryScheduler =
MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));

private ExecutorService executorService;

private RemoteActionContextProvider actionContextProvider;
private RemoteActionInputFetcher actionInputFetcher;
private RemoteOutputsMode remoteOutputsMode;
Expand All @@ -129,7 +139,11 @@ public ManagedChannel newChannel(
List<ClientInterceptor> interceptors)
throws IOException {
return GoogleAuthUtils.newChannel(
target, proxy, options, interceptors.isEmpty() ? null : interceptors);
executorService,
target,
proxy,
options,
interceptors.isEmpty() ? null : interceptors);
}
};

Expand Down Expand Up @@ -224,7 +238,7 @@ private void initHttpAndDiskCache(
new RemoteCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteCaching(
env, remoteCache, /* retryScheduler= */ null, digestUtil);
executorService, env, remoteCache, /* retryScheduler= */ null, digestUtil);
}

@Override
Expand Down Expand Up @@ -293,6 +307,24 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
env.getOutputBase().getRelative(env.getRuntime().getProductName() + "-remote-logs");
cleanAndCreateRemoteLogsDir(logDir);

BuildRequestOptions buildRequestOptions =
env.getOptions().getOptions(BuildRequestOptions.class);

int jobs = 0;
if (buildRequestOptions != null) {
jobs = buildRequestOptions.jobs;
}

ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("remote-executor-%d").build();
if (jobs != 0) {
executorService =
new ThreadPoolExecutor(
/*corePoolSize=*/ 0, jobs, 60L, SECONDS, new LinkedBlockingQueue<>(), threadFactory);
} else {
executorService = Executors.newCachedThreadPool(threadFactory);
}

if ((enableHttpCache || enableDiskCache) && !enableGrpcCache) {
initHttpAndDiskCache(env, authAndTlsOptions, remoteOptions, digestUtil);
return;
Expand Down Expand Up @@ -547,7 +579,13 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
new RemoteExecutionCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteExecution(
env, remoteCache, remoteExecutor, retryScheduler, digestUtil, logDir);
executorService,
env,
remoteCache,
remoteExecutor,
retryScheduler,
digestUtil,
logDir);
repositoryRemoteExecutorFactoryDelegate.init(
new RemoteRepositoryRemoteExecutorFactory(
remoteCache,
Expand Down Expand Up @@ -578,7 +616,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
new RemoteCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteCaching(
env, remoteCache, retryScheduler, digestUtil);
executorService, env, remoteCache, retryScheduler, digestUtil);
}

if (enableRemoteDownloader) {
Expand Down Expand Up @@ -773,6 +811,7 @@ public void afterCommand() throws AbruptExitException {
logger.atWarning().withCause(e).log(failureMessage);
}

executorService = null;
buildEventArtifactUploaderFactoryDelegate.reset();
repositoryRemoteExecutorFactoryDelegate.reset();
remoteDownloaderSupplier.set(null);
Expand Down

0 comments on commit 581c81a

Please sign in to comment.