From 071e84fb88fdd272f35ac67d32ed5e5dba8e3044 Mon Sep 17 00:00:00 2001 From: Geoff Bourne Date: Wed, 23 Apr 2025 21:53:29 -0500 Subject: [PATCH 1/2] http: limit concurrent file downloads --- .../itzg/helpers/http/ConcurrencyLimiter.java | 41 ++++++ .../itzg/helpers/http/FetchBuilderBase.java | 4 + .../me/itzg/helpers/http/SharedFetch.java | 9 +- .../me/itzg/helpers/http/SharedFetchArgs.java | 8 ++ .../http/SpecificFileFetchBuilder.java | 128 ++++++++++-------- 5 files changed, 131 insertions(+), 59 deletions(-) create mode 100644 src/main/java/me/itzg/helpers/http/ConcurrencyLimiter.java diff --git a/src/main/java/me/itzg/helpers/http/ConcurrencyLimiter.java b/src/main/java/me/itzg/helpers/http/ConcurrencyLimiter.java new file mode 100644 index 00000000..5306cd1c --- /dev/null +++ b/src/main/java/me/itzg/helpers/http/ConcurrencyLimiter.java @@ -0,0 +1,41 @@ +package me.itzg.helpers.http; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class ConcurrencyLimiter { + + private final Semaphore semaphore; + + public ConcurrencyLimiter(int concurrency) { + this.semaphore = new Semaphore(concurrency); + } + + public Mono limit(Mono source) { + return Mono.using( + () -> { + semaphore.acquire(); + return Boolean.TRUE; + }, + r -> source, + r -> semaphore.release() + ) + .subscribeOn(Schedulers.boundedElastic()); + } + + public Mono limit() { + return Mono.fromFuture(CompletableFuture.runAsync(() -> { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + })); + } + + public void release() { + semaphore.release(); + } +} diff --git a/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java b/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java index 543d5ec0..c52e8c47 100644 --- a/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java +++ b/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java @@ -142,6 +142,10 @@ protected URI uri() { return state.uri; } + protected ConcurrencyLimiter getConcurrencyLimiter() { + return state.sharedFetch != null ? state.sharedFetch.getConcurrencyLimiter() : null; + } + public Set getAcceptContentTypes() { return state.acceptContentTypes; } diff --git a/src/main/java/me/itzg/helpers/http/SharedFetch.java b/src/main/java/me/itzg/helpers/http/SharedFetch.java index 72c36c93..228d0e0f 100644 --- a/src/main/java/me/itzg/helpers/http/SharedFetch.java +++ b/src/main/java/me/itzg/helpers/http/SharedFetch.java @@ -33,6 +33,8 @@ public class SharedFetch implements AutoCloseable { private final HttpClient reactiveClient; + private final ConcurrencyLimiter concurrencyLimiter; + public SharedFetch(String forCommand, Options options) { final String userAgent = String.format("%s/%s/%s (cmd=%s)", "itzg", @@ -74,6 +76,8 @@ public SharedFetch(String forCommand, Options options) { ); headers.put("x-fetch-session", fetchSessionId); + + concurrencyLimiter = new ConcurrencyLimiter(options.getConcurrentFileDownloads()); } public FetchBuilderBase fetch(URI uri) { @@ -116,6 +120,9 @@ public static class Options { @Default private final Duration pendingAcquireTimeout = Duration.ofSeconds(120); + @Default + private final int concurrentFileDownloads = 10; + private final Map extraHeaders; public Options withHeader(String key, String value) { @@ -124,7 +131,7 @@ public Options withHeader(String key, String value) { newHeaders.put(key, value); return new Options( - responseTimeout, tlsHandshakeTimeout, maxIdleTimeout, pendingAcquireTimeout, + responseTimeout, tlsHandshakeTimeout, maxIdleTimeout, pendingAcquireTimeout, concurrentFileDownloads, newHeaders ); } diff --git a/src/main/java/me/itzg/helpers/http/SharedFetchArgs.java b/src/main/java/me/itzg/helpers/http/SharedFetchArgs.java index 6ca4d75b..e64461b3 100644 --- a/src/main/java/me/itzg/helpers/http/SharedFetchArgs.java +++ b/src/main/java/me/itzg/helpers/http/SharedFetchArgs.java @@ -48,6 +48,14 @@ public void setPendingAcquireTimeout(Duration timeout) { optionsBuilder.pendingAcquireTimeout(timeout); } + @Option(names = "--concurrent-file-downloads", defaultValue = "${env:FETCH_CONCURRENT_FILE_DOWNLOADS:-10}", + paramLabel = "COUNT", + description = "The maximum number of concurrent file downloads. Default: ${DEFAULT-VALUE}" + ) + public void setConcurrentFileDownloads(int concurrentFileDownloads) { + optionsBuilder.concurrentFileDownloads(concurrentFileDownloads); + } + public Options options() { return optionsBuilder.build(); } diff --git a/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java b/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java index 000aa411..58ce3e5e 100644 --- a/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java +++ b/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java @@ -5,6 +5,7 @@ import static java.lang.System.currentTimeMillis; import static java.util.Objects.requireNonNull; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpResponseStatus; import java.io.IOException; import java.net.URI; @@ -17,6 +18,8 @@ import me.itzg.helpers.errors.GenericException; import me.itzg.helpers.files.ReactiveFileUtils; import reactor.core.publisher.Mono; +import reactor.netty.ByteBufFlux; +import reactor.netty.http.client.HttpClientResponse; @Slf4j @Accessors(fluent = true) @@ -65,69 +68,78 @@ public Mono assemble() { final boolean useIfModifiedSince = skipUpToDate && Files.exists(file); return useReactiveClient(client -> - client - .doOnRequest((httpClientRequest, connection) -> - statusHandler.call(FileDownloadStatus.DOWNLOADING, uri, file) - ) - .headers(headers -> { - if (useIfModifiedSince) { - try { - final FileTime lastModifiedTime; - lastModifiedTime = Files.getLastModifiedTime(file); - headers.set( - IF_MODIFIED_SINCE, - httpDateTimeFormatter.format(lastModifiedTime.toInstant()) + getConcurrencyLimiter().limit( + client + .doOnRequest((httpClientRequest, connection) -> + statusHandler.call(FileDownloadStatus.DOWNLOADING, uri, file) + ) + .headers(headers -> + setupHeaders(headers, useIfModifiedSince) + ) + .followRedirect(true) + .doOnRequest(debugLogRequest(log, "file fetch")) + .get() + .uri(uri) + .response((resp, byteBufFlux) -> + processResponse(resp, byteBufFlux, useIfModifiedSince, uri) + ) + .last() + .contextWrite(context -> context.put("downloadStart", currentTimeMillis())) + ) + ); + } + + private Mono processResponse(HttpClientResponse resp, ByteBufFlux byteBufFlux, boolean useIfModifiedSince, URI uri) { + final HttpResponseStatus status = resp.status(); + + if (useIfModifiedSince && status == NOT_MODIFIED) { + log.debug("The file {} is already up to date", file); + statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri, file); + return Mono.just(file); + } + + if (notSuccess(resp)) { + return failedRequestMono(resp, byteBufFlux.aggregate(), "Trying to retrieve file"); + } + + if (notExpectedContentType(resp)) { + return failedContentTypeMono(resp); + } + + return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, file) + .flatMap(fileSize -> { + statusHandler.call(FileDownloadStatus.DOWNLOADED, uri, file); + downloadedHandler.call(uri, file, fileSize); + return Mono + .deferContextual(contextView -> { + if (log.isDebugEnabled()) { + final long durationMillis = + currentTimeMillis() - contextView.get("downloadStart"); + log.debug("Download of {} took {} at {}", + uri, formatDuration(durationMillis), transferRate(durationMillis, fileSize) ); - } catch (IOException e) { - throw new GenericException("Unable to get last modified time of " + file, e); } + return Mono.just(file); + }); + }); + } - } + private void setupHeaders(HttpHeaders headers, boolean useIfModifiedSince) { + if (useIfModifiedSince) { + try { + final FileTime lastModifiedTime; + lastModifiedTime = Files.getLastModifiedTime(file); + headers.set( + IF_MODIFIED_SINCE, + httpDateTimeFormatter.format(lastModifiedTime.toInstant()) + ); + } catch (IOException e) { + throw new GenericException("Unable to get last modified time of " + file, e); + } - applyHeaders(headers); - }) - .followRedirect(true) - .doOnRequest(debugLogRequest(log, "file fetch")) - .get() - .uri(uri) - .response((resp, byteBufFlux) -> { - final HttpResponseStatus status = resp.status(); + } - if (useIfModifiedSince && status == NOT_MODIFIED) { - log.debug("The file {} is already up to date", file); - statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri, file); - return Mono.just(file); - } - - if (notSuccess(resp)) { - return failedRequestMono(resp, byteBufFlux.aggregate(), "Trying to retrieve file"); - } - - if (notExpectedContentType(resp)) { - return failedContentTypeMono(resp); - } - - return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, file) - .flatMap(fileSize -> { - statusHandler.call(FileDownloadStatus.DOWNLOADED, uri, file); - downloadedHandler.call(uri, file, fileSize); - return Mono - .deferContextual(contextView -> { - if (log.isDebugEnabled()) { - final long durationMillis = - currentTimeMillis() - contextView.get("downloadStart"); - log.debug("Download of {} took {} at {}", - uri, formatDuration(durationMillis), transferRate(durationMillis, fileSize) - ); - } - return Mono.just(file); - }); - }); - - }) - .last() - .contextWrite(context -> context.put("downloadStart", currentTimeMillis())) - ); + applyHeaders(headers); } } From 6a941927135ff32e08a7c29b5e3c7653a9e717e3 Mon Sep 17 00:00:00 2001 From: Geoff Bourne Date: Wed, 23 Apr 2025 22:02:39 -0500 Subject: [PATCH 2/2] Use noop limiter when not using shared fetch --- .../itzg/helpers/http/ConcurrencyLimiter.java | 43 ++++--------------- .../helpers/http/ConcurrencyLimiterImpl.java | 27 ++++++++++++ .../itzg/helpers/http/FetchBuilderBase.java | 3 +- .../me/itzg/helpers/http/SharedFetch.java | 2 +- 4 files changed, 39 insertions(+), 36 deletions(-) create mode 100644 src/main/java/me/itzg/helpers/http/ConcurrencyLimiterImpl.java diff --git a/src/main/java/me/itzg/helpers/http/ConcurrencyLimiter.java b/src/main/java/me/itzg/helpers/http/ConcurrencyLimiter.java index 5306cd1c..cd0fee98 100644 --- a/src/main/java/me/itzg/helpers/http/ConcurrencyLimiter.java +++ b/src/main/java/me/itzg/helpers/http/ConcurrencyLimiter.java @@ -1,41 +1,16 @@ package me.itzg.helpers.http; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Semaphore; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; -public class ConcurrencyLimiter { +@FunctionalInterface +public interface ConcurrencyLimiter { - private final Semaphore semaphore; + ConcurrencyLimiter NOOP_LIMITER = new ConcurrencyLimiter() { + @Override + public Mono limit(Mono source) { + return source; + } + }; - public ConcurrencyLimiter(int concurrency) { - this.semaphore = new Semaphore(concurrency); - } - - public Mono limit(Mono source) { - return Mono.using( - () -> { - semaphore.acquire(); - return Boolean.TRUE; - }, - r -> source, - r -> semaphore.release() - ) - .subscribeOn(Schedulers.boundedElastic()); - } - - public Mono limit() { - return Mono.fromFuture(CompletableFuture.runAsync(() -> { - try { - semaphore.acquire(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - })); - } - - public void release() { - semaphore.release(); - } + Mono limit(Mono source); } diff --git a/src/main/java/me/itzg/helpers/http/ConcurrencyLimiterImpl.java b/src/main/java/me/itzg/helpers/http/ConcurrencyLimiterImpl.java new file mode 100644 index 00000000..ea44ff4b --- /dev/null +++ b/src/main/java/me/itzg/helpers/http/ConcurrencyLimiterImpl.java @@ -0,0 +1,27 @@ +package me.itzg.helpers.http; + +import java.util.concurrent.Semaphore; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class ConcurrencyLimiterImpl implements ConcurrencyLimiter { + + private final Semaphore semaphore; + + public ConcurrencyLimiterImpl(int concurrency) { + this.semaphore = new Semaphore(concurrency); + } + + @Override + public Mono limit(Mono source) { + return Mono.using( + () -> { + semaphore.acquire(); + return Boolean.TRUE; + }, + r -> source, + r -> semaphore.release() + ) + .subscribeOn(Schedulers.boundedElastic()); + } +} diff --git a/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java b/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java index c52e8c47..218a2655 100644 --- a/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java +++ b/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java @@ -143,7 +143,8 @@ protected URI uri() { } protected ConcurrencyLimiter getConcurrencyLimiter() { - return state.sharedFetch != null ? state.sharedFetch.getConcurrencyLimiter() : null; + return state.sharedFetch != null ? state.sharedFetch.getConcurrencyLimiter() + : ConcurrencyLimiter.NOOP_LIMITER; } public Set getAcceptContentTypes() { diff --git a/src/main/java/me/itzg/helpers/http/SharedFetch.java b/src/main/java/me/itzg/helpers/http/SharedFetch.java index 228d0e0f..6d1ae723 100644 --- a/src/main/java/me/itzg/helpers/http/SharedFetch.java +++ b/src/main/java/me/itzg/helpers/http/SharedFetch.java @@ -77,7 +77,7 @@ public SharedFetch(String forCommand, Options options) { headers.put("x-fetch-session", fetchSessionId); - concurrencyLimiter = new ConcurrencyLimiter(options.getConcurrentFileDownloads()); + concurrencyLimiter = new ConcurrencyLimiterImpl(options.getConcurrentFileDownloads()); } public FetchBuilderBase fetch(URI uri) {