Skip to content

Commit

Permalink
Use CompletableFuture in JarCache (#715)
Browse files Browse the repository at this point in the history
  • Loading branch information
basil committed Jan 8, 2024
1 parent 94ce994 commit b6a2c27
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 36 deletions.
3 changes: 2 additions & 1 deletion src/main/java/hudson/remoting/JarCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.concurrent.CompletableFuture;

/**
* Jar file cache.
Expand Down Expand Up @@ -55,5 +56,5 @@ public abstract class JarCache {
* URL of the jar file.
*/
@NonNull
public abstract Future<URL> resolve(@NonNull Channel channel, long sum1, long sum2) throws IOException, InterruptedException;
public abstract CompletableFuture<URL> resolve(@NonNull Channel channel, long sum1, long sum2) throws IOException, InterruptedException;
}
21 changes: 11 additions & 10 deletions src/main/java/hudson/remoting/JarCacheSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;
import java.net.URL;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
Expand All @@ -21,7 +22,7 @@ public abstract class JarCacheSupport extends JarCache {
/**
* Remember in-progress jar file resolution to avoid retrieving the same jar file twice.
*/
private final ConcurrentMap<Checksum,Future<URL>> inprogress = new ConcurrentHashMap<>();
private final ConcurrentMap<Checksum,CompletableFuture<URL>> inprogress = new ConcurrentHashMap<>();

/**
* Look up the local cache and return URL if found.
Expand All @@ -45,11 +46,11 @@ public abstract class JarCacheSupport extends JarCache {

@Override
@NonNull
public Future<URL> resolve(@NonNull final Channel channel, final long sum1, final long sum2) throws IOException, InterruptedException {
public CompletableFuture<URL> resolve(@NonNull final Channel channel, final long sum1, final long sum2) throws IOException, InterruptedException {
URL jar = lookInCache(channel,sum1, sum2);
if (jar!=null) {
// already in the cache
return new AsyncFutureImpl<>(jar);
return CompletableFuture.completedFuture(jar);
}

final Checksum key = new Checksum(sum1, sum2);
Expand All @@ -58,8 +59,8 @@ public Future<URL> resolve(@NonNull final Channel channel, final long sum1, fina

@NonNull
@SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", justification = "API compatibility")
private Future<URL> submitDownload(Channel channel, long sum1, long sum2, Checksum key) {
final AsyncFutureImpl<URL> promise = new AsyncFutureImpl<>();
private CompletableFuture<URL> submitDownload(Channel channel, long sum1, long sum2, Checksum key) {
final CompletableFuture<URL> promise = new CompletableFuture<>();
downloader.submit(new DownloadRunnable(channel, sum1, sum2, key, promise));
return promise;
}
Expand All @@ -70,9 +71,9 @@ private class DownloadRunnable implements Runnable {
final long sum1;
final long sum2;
final Checksum key;
final AsyncFutureImpl<URL> promise;
final CompletableFuture<URL> promise;

public DownloadRunnable(Channel channel, long sum1, long sum2, Checksum key, AsyncFutureImpl<URL> promise) {
public DownloadRunnable(Channel channel, long sum1, long sum2, Checksum key, CompletableFuture<URL> promise) {
this.channel = channel;
this.sum1 = sum1;
this.sum2 = sum2;
Expand All @@ -85,7 +86,7 @@ public void run() {
try {
URL url = retrieve(channel, sum1, sum2);
inprogress.remove(key);
promise.set(url);
promise.complete(url);
} catch (ChannelClosedException | RequestAbortedException e) {
// the connection was killed while we were still resolving the file
bailout(e);
Expand All @@ -101,7 +102,7 @@ public void run() {
} else {
// in other general failures, we aren't retrying
// TODO: or should we?
promise.set(e);
promise.completeExceptionally(e);
LOGGER.log(Level.WARNING, String.format("Failed to resolve a jar %016x%016x", sum1, sum2), e);
}
}
Expand All @@ -112,7 +113,7 @@ public void run() {
*/
private void bailout(Throwable e) {
inprogress.remove(key); // this lets another thread to retry later
promise.set(e); // then tell those who are waiting that we aborted
promise.completeExceptionally(e); // then tell those who are waiting that we aborted
}
}

Expand Down
1 change: 1 addition & 0 deletions src/main/java/hudson/remoting/RemoteClassLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/hudson/remoting/ResourceImageBoth.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.io.IOException;
import java.net.URL;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down
6 changes: 4 additions & 2 deletions src/main/java/hudson/remoting/ResourceImageDirect.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import java.io.IOException;
import java.net.URL;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -35,12 +37,12 @@ class ResourceImageDirect extends ResourceImageRef {
@Override
Future<byte[]> resolve(Channel channel, String resourcePath) throws IOException, InterruptedException {
LOGGER.log(Level.FINE, resourcePath+" image is direct");
return new AsyncFutureImpl<>(payload);
return CompletableFuture.completedFuture(payload);
}

@Override
Future<URLish> resolveURL(Channel channel, String resourcePath) throws IOException, InterruptedException {
return new AsyncFutureImpl<>(URLish.from(makeResource(resourcePath, payload)));
return CompletableFuture.completedFuture(URLish.from(makeResource(resourcePath, payload)));
}

private static final Logger LOGGER = Logger.getLogger(ResourceImageDirect.class.getName());
Expand Down
47 changes: 24 additions & 23 deletions src/main/java/hudson/remoting/ResourceImageInJar.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

/**
* {@link ResourceImageRef} that points to a resource inside a jar file.
Expand Down Expand Up @@ -42,31 +45,29 @@ class ResourceImageInJar extends ResourceImageRef {

@Override
Future<byte[]> resolve(Channel channel, final String resourcePath) throws IOException, InterruptedException {
return new FutureAdapter<>(_resolveJarURL(channel)) {
@Override
@SuppressFBWarnings(value = "URLCONNECTION_SSRF_FD", justification = "This is only used for managing the jar cache as files.")
protected byte[] adapt(URL jar) throws ExecutionException {
try {
return Util.readFully(toResourceURL(jar, resourcePath).openStream());
} catch (IOException e) {
throw new ExecutionException(e);
}
}
};
return _resolveJarURL(channel).thenApply(jar -> readContents(jar, resourcePath));
}

@SuppressFBWarnings(value = "URLCONNECTION_SSRF_FD", justification = "This is only used for managing the jar cache as files.")
private byte[] readContents(URL jar, String resourcePath) {
try (InputStream in = toResourceURL(jar, resourcePath).openStream()) {
return Util.readFully(in);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
Future<URLish> resolveURL(Channel channel, final String resourcePath) throws IOException, InterruptedException {
return new FutureAdapter<>(_resolveJarURL(channel)) {
@Override
protected URLish adapt(URL jar) throws ExecutionException {
try {
return URLish.from(toResourceURL(jar, resourcePath));
} catch (IOException e) {
throw new ExecutionException(e);
}
}
};
return _resolveJarURL(channel).thenApply(jar -> getUrlish(jar, resourcePath));
}

private URLish getUrlish(URL jar, String resourcePath) {
try {
return URLish.from(toResourceURL(jar, resourcePath));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@NonNull
Expand All @@ -83,7 +84,7 @@ open jar file (see sun.net.www.protocol.jar.JarURLConnection.JarURLInputStream.c
return new URL("jar:"+ jar +"!/"+resourcePath);
}

Future<URL> _resolveJarURL(Channel channel) throws IOException, InterruptedException {
CompletableFuture<URL> _resolveJarURL(Channel channel) throws IOException, InterruptedException {
JarCache c = channel.getJarCache();
if (c == null) {
throw new IOException(String.format("Failed to resolve a jar %016x%016x. JAR Cache is disabled for the channel %s",
Expand Down
1 change: 1 addition & 0 deletions src/main/java/hudson/remoting/ResourceImageRef.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.Future;

/**
* Wire protocol data representation that encapsulates the access to a resource inside a {@link ClassLoader}.
Expand Down

0 comments on commit b6a2c27

Please sign in to comment.