Skip to content

Commit

Permalink
Remote: Use parameters instead of thread-local storage to provide tra…
Browse files Browse the repository at this point in the history
…cing metadata. (Part 3)

Change RemoteCacheClient#downloadBlob to use RemoteActionExecutionContext.

PiperOrigin-RevId: 354239205
  • Loading branch information
Googler authored and Copybara-Service committed Jan 28, 2021
1 parent 92955e6 commit 75bd1ff
Show file tree
Hide file tree
Showing 27 changed files with 513 additions and 201 deletions.
Expand Up @@ -129,9 +129,11 @@ private ContentAddressableStorageFutureStub casFutureStub() {
.withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS);
}

private ByteStreamStub bsAsyncStub() {
private ByteStreamStub bsAsyncStub(RemoteActionExecutionContext context) {
return ByteStreamGrpc.newStub(channel)
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withInterceptors(
TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()),
new NetworkTimeInterceptor(context::getNetworkTime))
.withCallCredentials(callCredentialsProvider.getCallCredentials())
.withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -283,7 +285,8 @@ public void uploadActionResult(
}

@Override
public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
public ListenableFuture<Void> downloadBlob(
RemoteActionExecutionContext context, Digest digest, OutputStream out) {
if (digest.getSizeBytes() == 0) {
return Futures.immediateFuture(null);
}
Expand All @@ -295,23 +298,23 @@ public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
out = digestOut;
}

return downloadBlob(digest, out, digestSupplier);
return downloadBlob(context, digest, out, digestSupplier);
}

private ListenableFuture<Void> downloadBlob(
Digest digest, OutputStream out, @Nullable Supplier<Digest> digestSupplier) {
Context ctx = Context.current();
RemoteActionExecutionContext context,
Digest digest,
OutputStream out,
@Nullable Supplier<Digest> digestSupplier) {
AtomicLong offset = new AtomicLong(0);
ProgressiveBackoff progressiveBackoff = new ProgressiveBackoff(retrier::newBackoff);
ListenableFuture<Void> downloadFuture =
Utils.refreshIfUnauthenticatedAsync(
() ->
retrier.executeAsync(
() ->
ctx.call(
() ->
requestRead(
offset, progressiveBackoff, digest, out, digestSupplier)),
requestRead(
context, offset, progressiveBackoff, digest, out, digestSupplier),
progressiveBackoff),
callCredentialsProvider);

Expand All @@ -331,14 +334,15 @@ public static String getResourceName(String instanceName, Digest digest) {
}

private ListenableFuture<Void> requestRead(
RemoteActionExecutionContext context,
AtomicLong offset,
ProgressiveBackoff progressiveBackoff,
Digest digest,
OutputStream out,
@Nullable Supplier<Digest> digestSupplier) {
String resourceName = getResourceName(options.remoteInstanceName, digest);
SettableFuture<Void> future = SettableFuture.create();
bsAsyncStub()
bsAsyncStub(context)
.read(
ReadRequest.newBuilder()
.setResourceName(resourceName)
Expand Down
Expand Up @@ -33,6 +33,9 @@
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.NetworkTime;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.remote.util.Utils;
Expand Down Expand Up @@ -66,15 +69,17 @@ class RemoteActionInputFetcher implements ActionInputPrefetcher {
@GuardedBy("lock")
final Map<Path, ListenableFuture<Void>> downloadsInProgress = new HashMap<>();

private final String buildRequestId;
private final String commandId;
private final RemoteCache remoteCache;
private final Path execRoot;
private final RequestMetadata requestMetadata;

RemoteActionInputFetcher(
RemoteCache remoteCache, Path execRoot, RequestMetadata requestMetadata) {
String buildRequestId, String commandId, RemoteCache remoteCache, Path execRoot) {
this.buildRequestId = Preconditions.checkNotNull(buildRequestId);
this.commandId = Preconditions.checkNotNull(commandId);
this.remoteCache = Preconditions.checkNotNull(remoteCache);
this.execRoot = Preconditions.checkNotNull(execRoot);
this.requestMetadata = Preconditions.checkNotNull(requestMetadata);
}

/**
Expand Down Expand Up @@ -160,13 +165,15 @@ private ListenableFuture<Void> downloadFileAsync(Path path, FileArtifactValue me

ListenableFuture<Void> download = downloadsInProgress.get(path);
if (download == null) {
Context ctx =
TracingMetadataUtils.contextWithMetadata(
requestMetadata.toBuilder().setActionId(metadata.getActionId()).build());
RequestMetadata requestMetadata =
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, metadata.getActionId());
RemoteActionExecutionContext remoteActionExecutionContext =
new RemoteActionExecutionContextImpl(requestMetadata, new NetworkTime());
Context ctx = TracingMetadataUtils.contextWithMetadata(requestMetadata);
Context prevCtx = ctx.attach();
try {
Digest digest = DigestUtil.buildDigest(metadata.getDigest(), metadata.getSize());
download = remoteCache.downloadFile(path, digest);
download = remoteCache.downloadFile(remoteActionExecutionContext, path, digest);
downloadsInProgress.put(path, download);
Futures.addCallback(
download,
Expand Down
40 changes: 25 additions & 15 deletions src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
Expand Up @@ -261,14 +261,15 @@ public static void waitForBulkTransfer(
* @return a future that completes after the download completes (succeeds / fails). If successful,
* the content is stored in the future's {@code byte[]}.
*/
public ListenableFuture<byte[]> downloadBlob(Digest digest) {
public ListenableFuture<byte[]> downloadBlob(
RemoteActionExecutionContext context, Digest digest) {
if (digest.getSizeBytes() == 0) {
return EMPTY_BYTES;
}
ByteArrayOutputStream bOut = new ByteArrayOutputStream((int) digest.getSizeBytes());
SettableFuture<byte[]> outerF = SettableFuture.create();
Futures.addCallback(
cacheProtocol.downloadBlob(digest, bOut),
cacheProtocol.downloadBlob(context, digest, bOut),
new FutureCallback<Void>() {
@Override
public void onSuccess(Void aVoid) {
Expand Down Expand Up @@ -305,12 +306,13 @@ private static Path toTmpDownloadPath(Path actualPath) {
* @throws ExecException in case clean up after a failed download failed.
*/
public void download(
RemoteActionExecutionContext context,
ActionResult result,
Path execRoot,
FileOutErr origOutErr,
OutputFilesLocker outputFilesLocker)
throws ExecException, IOException, InterruptedException {
ActionResultMetadata metadata = parseActionResultMetadata(result, execRoot);
ActionResultMetadata metadata = parseActionResultMetadata(context, result, execRoot);

List<ListenableFuture<FileMetadata>> downloads =
Stream.concat(
Expand All @@ -321,7 +323,7 @@ public void download(
(file) -> {
try {
ListenableFuture<Void> download =
downloadFile(toTmpDownloadPath(file.path()), file.digest());
downloadFile(context, toTmpDownloadPath(file.path()), file.digest());
return Futures.transform(download, (d) -> file, directExecutor());
} catch (IOException e) {
return Futures.<FileMetadata>immediateFailedFuture(e);
Expand All @@ -337,7 +339,7 @@ public void download(
if (origOutErr != null) {
tmpOutErr = origOutErr.childOutErr();
}
downloads.addAll(downloadOutErr(result, tmpOutErr));
downloads.addAll(downloadOutErr(context, result, tmpOutErr));

try {
waitForBulkTransfer(downloads, /* cancelRemainingOnInterrupt=*/ true);
Expand Down Expand Up @@ -449,7 +451,8 @@ private void createSymlinks(Iterable<SymlinkMetadata> symlinks) throws IOExcepti
}

/** Downloads a file (that is not a directory). The content is fetched from the digest. */
public ListenableFuture<Void> downloadFile(Path path, Digest digest) throws IOException {
public ListenableFuture<Void> downloadFile(
RemoteActionExecutionContext context, Path path, Digest digest) throws IOException {
Preconditions.checkNotNull(path.getParentDirectory()).createDirectoryAndParents();
if (digest.getSizeBytes() == 0) {
// Handle empty file locally.
Expand All @@ -472,7 +475,7 @@ public ListenableFuture<Void> downloadFile(Path path, Digest digest) throws IOEx

OutputStream out = new LazyFileOutputStream(path);
SettableFuture<Void> outerF = SettableFuture.create();
ListenableFuture<Void> f = cacheProtocol.downloadBlob(digest, out);
ListenableFuture<Void> f = cacheProtocol.downloadBlob(context, digest, out);
Futures.addCallback(
f,
new FutureCallback<Void>() {
Expand Down Expand Up @@ -509,7 +512,8 @@ public void onFailure(Throwable t) {
return outerF;
}

private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result, OutErr outErr) {
private List<ListenableFuture<FileMetadata>> downloadOutErr(
RemoteActionExecutionContext context, ActionResult result, OutErr outErr) {
List<ListenableFuture<FileMetadata>> downloads = new ArrayList<>();
if (!result.getStdoutRaw().isEmpty()) {
try {
Expand All @@ -521,7 +525,8 @@ private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result,
} else if (result.hasStdoutDigest()) {
downloads.add(
Futures.transform(
cacheProtocol.downloadBlob(result.getStdoutDigest(), outErr.getOutputStream()),
cacheProtocol.downloadBlob(
context, result.getStdoutDigest(), outErr.getOutputStream()),
(d) -> null,
directExecutor()));
}
Expand All @@ -535,7 +540,8 @@ private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result,
} else if (result.hasStderrDigest()) {
downloads.add(
Futures.transform(
cacheProtocol.downloadBlob(result.getStderrDigest(), outErr.getErrorStream()),
cacheProtocol.downloadBlob(
context, result.getStderrDigest(), outErr.getErrorStream()),
(d) -> null,
directExecutor()));
}
Expand All @@ -549,6 +555,7 @@ private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result,
* <p>This method only downloads output directory metadata, stdout and stderr as well as the
* contents of {@code inMemoryOutputPath} if specified.
*
* @param context the context this action running with
* @param result the action result metadata of a successfully executed action (exit code = 0).
* @param outputs the action's declared output files
* @param inMemoryOutputPath the path of an output file whose contents should be returned in
Expand All @@ -564,6 +571,7 @@ private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result,
*/
@Nullable
public InMemoryOutput downloadMinimal(
RemoteActionExecutionContext context,
String actionId,
ActionResult result,
Collection<? extends ActionInput> outputs,
Expand All @@ -579,7 +587,7 @@ public InMemoryOutput downloadMinimal(

ActionResultMetadata metadata;
try (SilentCloseable c = Profiler.instance().profile("Remote.parseActionResultMetadata")) {
metadata = parseActionResultMetadata(result, execRoot);
metadata = parseActionResultMetadata(context, result, execRoot);
}

if (!metadata.symlinks().isEmpty()) {
Expand Down Expand Up @@ -614,9 +622,10 @@ public InMemoryOutput downloadMinimal(
try (SilentCloseable c = Profiler.instance().profile("Remote.download")) {
ListenableFuture<byte[]> inMemoryOutputDownload = null;
if (inMemoryOutput != null) {
inMemoryOutputDownload = downloadBlob(inMemoryOutputDigest);
inMemoryOutputDownload = downloadBlob(context, inMemoryOutputDigest);
}
waitForBulkTransfer(downloadOutErr(result, outErr), /* cancelRemainingOnInterrupt=*/ true);
waitForBulkTransfer(
downloadOutErr(context, result, outErr), /* cancelRemainingOnInterrupt=*/ true);
if (inMemoryOutputDownload != null) {
waitForBulkTransfer(
ImmutableList.of(inMemoryOutputDownload), /* cancelRemainingOnInterrupt=*/ true);
Expand Down Expand Up @@ -708,7 +717,8 @@ private DirectoryMetadata parseDirectory(
return new DirectoryMetadata(filesBuilder.build(), symlinksBuilder.build());
}

private ActionResultMetadata parseActionResultMetadata(ActionResult actionResult, Path execRoot)
private ActionResultMetadata parseActionResultMetadata(
RemoteActionExecutionContext context, ActionResult actionResult, Path execRoot)
throws IOException, InterruptedException {
Preconditions.checkNotNull(actionResult, "actionResult");
Map<Path, ListenableFuture<Tree>> dirMetadataDownloads =
Expand All @@ -717,7 +727,7 @@ private ActionResultMetadata parseActionResultMetadata(ActionResult actionResult
dirMetadataDownloads.put(
execRoot.getRelative(dir.getPath()),
Futures.transform(
downloadBlob(dir.getTreeDigest()),
downloadBlob(context, dir.getTreeDigest()),
(treeBytes) -> {
try {
return Tree.parseFrom(treeBytes);
Expand Down
Expand Up @@ -15,7 +15,6 @@
package com.google.devtools.build.lib.remote;

import build.bazel.remote.execution.v2.DigestFunction;
import build.bazel.remote.execution.v2.RequestMetadata;
import build.bazel.remote.execution.v2.ServerCapabilities;
import com.google.auth.Credentials;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -512,9 +511,6 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
requestContext,
remoteOptions.remoteInstanceName));

Context repoContext =
TracingMetadataUtils.contextWithMetadata(buildRequestId, invocationId, "repository_rule");

if (enableRemoteExecution) {
RemoteExecutionClient remoteExecutor;
if (remoteOptions.remoteExecutionKeepalive) {
Expand Down Expand Up @@ -550,7 +546,6 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
digestUtil,
buildRequestId,
invocationId,
"repository_rule",
remoteOptions.remoteInstanceName,
remoteOptions.remoteAcceptCached));
} else {
Expand Down Expand Up @@ -579,10 +574,11 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
if (enableRemoteDownloader) {
remoteDownloaderSupplier.set(
new GrpcRemoteDownloader(
buildRequestId,
invocationId,
downloaderChannel.retain(),
Optional.ofNullable(credentials),
retrier,
repoContext,
cacheClient,
remoteOptions));
downloaderChannel.release();
Expand Down Expand Up @@ -855,14 +851,12 @@ public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorB
env.getOptions().getOptions(RemoteOptions.class), "RemoteOptions");
RemoteOutputsMode remoteOutputsMode = remoteOptions.remoteOutputsMode;
if (!remoteOutputsMode.downloadAllOutputs()) {
RequestMetadata requestMetadata =
RequestMetadata.newBuilder()
.setCorrelatedInvocationsId(env.getBuildRequestId())
.setToolInvocationId(env.getCommandId().toString())
.build();
actionInputFetcher =
new RemoteActionInputFetcher(
actionContextProvider.getRemoteCache(), env.getExecRoot(), requestMetadata);
env.getBuildRequestId(),
env.getCommandId().toString(),
actionContextProvider.getRemoteCache(),
env.getExecRoot());
builder.setActionInputPrefetcher(actionInputFetcher);
remoteOutputService.setActionInputFetcher(actionInputFetcher);
}
Expand Down

0 comments on commit 75bd1ff

Please sign in to comment.