diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 6fddca0d..26c05808 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -5,3 +5,7 @@ updates: directory: "/" schedule: interval: "weekly" + - package-ecosystem: gradle + directory: "/" + schedule: + interval: "weekly" diff --git a/build.gradle b/build.gradle index e4857cb9..6b811462 100644 --- a/build.gradle +++ b/build.gradle @@ -97,7 +97,7 @@ dependencies { implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' implementation 'com.jayway.jsonpath:json-path:2.7.0' implementation 'org.apache.httpcomponents.client5:httpclient5:5.2.1' - implementation 'io.projectreactor.netty:reactor-netty-http:1.0.24' + implementation 'io.projectreactor.netty:reactor-netty-http:1.1.1' implementation 'org.apache.maven:maven-artifact:3.8.5' implementation 'commons-codec:commons-codec:1.15' diff --git a/src/main/java/me/itzg/helpers/curseforge/CurseForgeInstaller.java b/src/main/java/me/itzg/helpers/curseforge/CurseForgeInstaller.java index 6132a43b..cef41302 100644 --- a/src/main/java/me/itzg/helpers/curseforge/CurseForgeInstaller.java +++ b/src/main/java/me/itzg/helpers/curseforge/CurseForgeInstaller.java @@ -9,7 +9,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -35,6 +34,9 @@ import me.itzg.helpers.http.SharedFetch; import me.itzg.helpers.http.UriBuilder; import me.itzg.helpers.json.ObjectMappers; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; @RequiredArgsConstructor @Slf4j @@ -49,6 +51,13 @@ public class CurseForgeInstaller { @Getter @Setter private String apiBaseUrl = "https://api.curse.tools/v1/cf"; + @Getter + @Setter + private int parallelism = 4; + + @Getter + @Setter + private boolean forceSynchronize; public void install(String slug, String fileMatcher, Integer fileId, Set excludedModIds) throws IOException { requireNonNull(outputDir, "outputDir is required"); @@ -77,12 +86,12 @@ else if (searchResponse.getData().size() > 1) { } } - private void processMod(SharedFetch preparedFetch, UriBuilder uriBuilder, CurseForgeMod mod, Integer fileId, String fileMatcher, + private void processMod(SharedFetch preparedFetch, UriBuilder uriBuilder, CurseForgeMod mod, Integer fileId, + String fileMatcher, Set excludedModIds ) throws IOException { - final CurseForgeFile modFile; if (fileId == null) { @@ -98,13 +107,18 @@ private void processMod(SharedFetch preparedFetch, UriBuilder uriBuilder, CurseF && manifest.getFileId() == modFile.getId() && manifest.getModId() == modFile.getModId() ) { - if (Manifests.allFilesPresent(outputDir, manifest)) { + if (forceSynchronize) { + log.info("Requested force synchronize of {}", modFile.getDisplayName()); + } + else if (Manifests.allFilesPresent(outputDir, manifest)) { log.info("Requested CurseForge modpack {} is already installed for {}", modFile.getDisplayName(), mod.getName() ); return; } - log.warn("Some files from modpack file {} were missing. Proceeding with a re-install", modFile.getFileName()); + else { + log.warn("Some files from modpack file {} were missing. Proceeding with a re-install", modFile.getFileName()); + } } log.info("Processing modpack {} @ {}:{}", modFile.getDisplayName(), modFile.getModId(), modFile.getId()); @@ -130,10 +144,10 @@ private static CurseForgeFile resolveModpackFile(SharedFetch preparedFetch, UriB ) throws IOException { // NOTE latestFiles in mod is only one or two files, so retrieve the full list instead final GetModFilesResponse resp = preparedFetch.fetch( - uriBuilder.resolve("/mods/{modId}/files", mod.getId()) - ) - .toObject(GetModFilesResponse.class) - .execute(); + uriBuilder.resolve("/mods/{modId}/files", mod.getId()) + ) + .toObject(GetModFilesResponse.class) + .execute(); return resp.getData().stream() .filter(file -> @@ -154,12 +168,12 @@ private static URI normalizeDownloadUrl(String downloadUrl) { final String filename = downloadUrl.substring(nameStart + 1); return URI.create( - downloadUrl.substring(0, nameStart+1) + + downloadUrl.substring(0, nameStart + 1) + filename .replace(" ", "%20") .replace("[", "%5B") .replace("]", "%5D") - ); + ); } private List processModpackFile(SharedFetch preparedFetch, UriBuilder uriBuilder, URI downloadUrl, @@ -171,6 +185,9 @@ private List processModpackFile(SharedFetch preparedFetch, UriBuilder uriB try { preparedFetch.fetch(downloadUrl) .toFile(downloaded) + .handleStatus((status, uri, file) -> + log.debug("Modpack file retrieval: status={} uri={} file={}", status, uri, file) + ) .execute(); final MinecraftModpackManifest modpackManifest = extractModpackManifest(downloaded); @@ -182,36 +199,40 @@ private List processModpackFile(SharedFetch preparedFetch, UriBuilder uriB final Path modsDir = Files.createDirectories(outputDir.resolve("mods")); - final List modFiles = modpackManifest.getFiles().stream() + final List modFiles = Flux.fromIterable(modpackManifest.getFiles()) + .parallel(parallelism) + .runOn(Schedulers.newParallel("downloader")) .filter(ManifestFileRef::isRequired) .filter(manifestFileRef -> !excludedModIds.contains(manifestFileRef.getProjectID())) - .map(fileRef -> + .flatMap(fileRef -> downloadModFile(preparedFetch, uriBuilder, modsDir, fileRef.getProjectID(), fileRef.getFileID()) ) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + .sequential() + .collectList() + .block(); - final List overrides = applyOverrides(downloaded); + final List overrides = applyOverrides(downloaded, modpackManifest.getOverrides()); prepareModLoader(modLoader.getId(), modpackManifest.getMinecraft().getVersion()); - return Stream.concat(modFiles.stream(), overrides.stream()) + return Stream.concat( + modFiles != null ? modFiles.stream() : Stream.empty(), + overrides.stream() + ) .collect(Collectors.toList()); - } - finally { + } finally { Files.delete(downloaded); } } - private List applyOverrides(Path modpackZip) throws IOException { + private List applyOverrides(Path modpackZip, String overridesDir) throws IOException { final ArrayList overrides = new ArrayList<>(); try (ZipInputStream zip = new ZipInputStream(Files.newInputStream(modpackZip))) { ZipEntry entry; while ((entry = zip.getNextEntry()) != null) { if (!entry.isDirectory()) { - // TODO lookup "overrides" from file model - if (entry.getName().startsWith("overrides/")) { - final String subpath = entry.getName().substring("overrides/".length()); + if (entry.getName().startsWith(overridesDir + "/")) { + final String subpath = entry.getName().substring(overridesDir.length() + 1/*for slash*/); final Path outPath = outputDir.resolve(subpath); log.debug("Applying override {}", subpath); @@ -227,26 +248,36 @@ private List applyOverrides(Path modpackZip) throws IOException { return overrides; } - private Path downloadModFile(SharedFetch preparedFetch, UriBuilder uriBuilder, Path modsDir, int projectID, int fileID) { + private Mono downloadModFile(SharedFetch preparedFetch, UriBuilder uriBuilder, Path modsDir, int projectID, int fileID + ) { try { final CurseForgeFile file = getModFileInfo(preparedFetch, uriBuilder, projectID, fileID); - if (!isServerMod(file)) { - log.debug("Skipping {} since it is a client mod", file.getFileName()); - return null; - } - - log.info("Download/confirm mod {} @ {}:{}", + log.debug("Download/confirm mod {} @ {}:{}", // several mods have non-descriptive display names, like "v1.0.0", so filename tends to be better file.getFileName(), projectID, fileID ); + if (!isServerMod(file)) { + log.debug("Skipping {} since it is a client mod", file.getFileName()); + return Mono.empty(); + } return preparedFetch.fetch( normalizeDownloadUrl(file.getDownloadUrl()) ) - .toDirectory(modsDir) + .toFile(modsDir.resolve(file.getFileName())) .skipExisting(true) - .execute(); + .handleStatus((status, uri, f) -> { + switch (status) { + case SKIP_FILE_EXISTS: + log.info("Mod file {} already exists", outputDir.relativize(f)); + break; + case DOWNLOADED: + log.info("Downloaded mod file {}", outputDir.relativize(f)); + break; + } + }) + .assemble(); } catch (IOException e) { throw new GenericException(String.format("Failed to locate mod file modId=%s fileId=%d", projectID, fileID), e); } diff --git a/src/main/java/me/itzg/helpers/curseforge/InstallCurseForgeCommand.java b/src/main/java/me/itzg/helpers/curseforge/InstallCurseForgeCommand.java index 29eed00e..cebed67c 100644 --- a/src/main/java/me/itzg/helpers/curseforge/InstallCurseForgeCommand.java +++ b/src/main/java/me/itzg/helpers/curseforge/InstallCurseForgeCommand.java @@ -43,6 +43,14 @@ public class InstallCurseForgeCommand implements Callable { description = "Substring to select specific modpack filename") String filenameMatcher; + @Option(names = "--force-synchronize") + boolean forceSynchronize; + + @Option(names = "--parallel-downloads", defaultValue = "4", + description = "Default: ${DEFAULT-VALUE}" + ) + int parallelDownloads; + private static final Pattern PAGE_URL_PATTERN = Pattern.compile( "https://www.curseforge.com/minecraft/modpacks/(?.+?)(/files(/(?\\d+)?)?)?"); @@ -71,7 +79,9 @@ public Integer call() throws Exception { return ExitCode.USAGE; } - final CurseForgeInstaller installer = new CurseForgeInstaller(outputDirectory, resultsFile); + final CurseForgeInstaller installer = new CurseForgeInstaller(outputDirectory, resultsFile) + .setForceSynchronize(forceSynchronize) + .setParallelism(parallelDownloads); installer.install(slug, filenameMatcher, fileId, excludedModIds); return ExitCode.OK; diff --git a/src/main/java/me/itzg/helpers/get/GetCommand.java b/src/main/java/me/itzg/helpers/get/GetCommand.java index 30bd45c5..08508008 100644 --- a/src/main/java/me/itzg/helpers/get/GetCommand.java +++ b/src/main/java/me/itzg/helpers/get/GetCommand.java @@ -211,7 +211,11 @@ null, new JsonPathOutputHandler( .toFile(outputFile) .skipUpToDate(skipUpToDate) .acceptContentTypes(acceptContentTypes) - .logProgressEach(logProgressEach) + .handleDownloaded((uri, f, contentSizeBytes) -> { + if (logProgressEach) { + log.info("Downloaded {}", f); + } + }) .execute(); if (this.outputFilename) { stdout.println(file); diff --git a/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java b/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java index c830606a..78328f49 100644 --- a/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java +++ b/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java @@ -80,6 +80,10 @@ protected HttpHead head(boolean withConfigure) throws IOException { return request; } + protected URI uri() { + return state.uri; + } + public List getAcceptContentTypes() { return state.acceptContentTypes; } diff --git a/src/main/java/me/itzg/helpers/http/FileDownloadStatus.java b/src/main/java/me/itzg/helpers/http/FileDownloadStatus.java new file mode 100644 index 00000000..c13a5b4a --- /dev/null +++ b/src/main/java/me/itzg/helpers/http/FileDownloadStatus.java @@ -0,0 +1,8 @@ +package me.itzg.helpers.http; + +public enum FileDownloadStatus { + SKIP_FILE_EXISTS, + SKIP_FILE_UP_TO_DATE, + DOWNLOADING, + DOWNLOADED +} diff --git a/src/main/java/me/itzg/helpers/http/FileDownloadStatusHandler.java b/src/main/java/me/itzg/helpers/http/FileDownloadStatusHandler.java new file mode 100644 index 00000000..733c6ecc --- /dev/null +++ b/src/main/java/me/itzg/helpers/http/FileDownloadStatusHandler.java @@ -0,0 +1,10 @@ +package me.itzg.helpers.http; + +import java.net.URI; +import java.nio.file.Path; + +@FunctionalInterface +public interface FileDownloadStatusHandler { + + void call(FileDownloadStatus status, URI uri, Path file); +} diff --git a/src/main/java/me/itzg/helpers/http/FileDownloadedHandler.java b/src/main/java/me/itzg/helpers/http/FileDownloadedHandler.java new file mode 100644 index 00000000..6aa77f08 --- /dev/null +++ b/src/main/java/me/itzg/helpers/http/FileDownloadedHandler.java @@ -0,0 +1,10 @@ +package me.itzg.helpers.http; + +import java.net.URI; +import java.nio.file.Path; + +@FunctionalInterface +public interface FileDownloadedHandler { + + void call(URI uri, Path file, long contentSizeBytes); +} diff --git a/src/main/java/me/itzg/helpers/http/FilenameExtractor.java b/src/main/java/me/itzg/helpers/http/FilenameExtractor.java index 4b302c9f..4718e5ea 100644 --- a/src/main/java/me/itzg/helpers/http/FilenameExtractor.java +++ b/src/main/java/me/itzg/helpers/http/FilenameExtractor.java @@ -2,6 +2,8 @@ import java.io.IOException; import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import lombok.extern.slf4j.Slf4j; import org.apache.hc.client5.http.HttpResponseException; import org.apache.hc.core5.http.ClassicHttpResponse; @@ -13,12 +15,29 @@ @Slf4j public class FilenameExtractor { + private static final Pattern HTTP_CONTENT_DISPOSITION = + Pattern.compile("(?inline|attachment)(\\s*;\\s+filename=\"(?.+?)\")?"); + private final LatchingUrisInterceptor interceptor; public FilenameExtractor(LatchingUrisInterceptor interceptor) { this.interceptor = Objects.requireNonNull(interceptor, "interceptor is required"); } + static String filenameFromContentDisposition(String headerValue) { + if (headerValue == null) { + return null; + } + + final Matcher m = HTTP_CONTENT_DISPOSITION.matcher(headerValue); + if (m.matches()) { + if (m.group("type").equals("attachment")) { + return m.group("filename"); + } + } + return null; + } + public String extract(ClassicHttpResponse response) throws IOException, ProtocolException { // Same as AbstractHttpClientResponseHandler if (response.getCode() >= HttpStatus.SC_REDIRECTION) { diff --git a/src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java b/src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java index 7b118f5a..71968835 100644 --- a/src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java +++ b/src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java @@ -1,14 +1,20 @@ package me.itzg.helpers.http; +import static java.util.Objects.requireNonNull; + +import io.netty.handler.codec.http.HttpHeaderNames; import java.io.IOException; +import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.function.Function; import lombok.Setter; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; -import org.apache.hc.client5.http.classic.methods.HttpGet; -import org.apache.hc.client5.http.classic.methods.HttpHead; -import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.netty.http.client.HttpClientResponse; @Slf4j @Accessors(fluent = true) @@ -17,6 +23,8 @@ public class OutputToDirectoryFetchBuilder extends FetchBuilderBase {}; + private FileDownloadedHandler downloadedHandler = (uri, file, contentSizeBytes) -> {}; protected OutputToDirectoryFetchBuilder(State state, Path outputDirectory) { super(state); @@ -27,24 +35,78 @@ protected OutputToDirectoryFetchBuilder(State state, Path outputDirectory) { this.outputDirectory = outputDirectory; } - public Path execute() throws IOException { - return usePreparedFetch(preparedFetch -> { - final CloseableHttpClient client = preparedFetch.getClient(); + @SuppressWarnings("unused") + public OutputToDirectoryFetchBuilder handleStatus(FileDownloadStatusHandler statusHandler) { + requireNonNull(statusHandler); + this.statusHandler = statusHandler; + return self(); + } - final HttpHead headReq = head(false); - final String filename = client.execute(headReq, new DeriveFilenameHandler( - preparedFetch.getLatchingUrisInterceptor() - )); + @SuppressWarnings("unused") + public OutputToDirectoryFetchBuilder handleDownloaded(FileDownloadedHandler downloadedHandler) { + requireNonNull(downloadedHandler); + this.downloadedHandler = downloadedHandler; + return self(); + } - final Path outputFile = outputDirectory.resolve(filename); - if (skipExisting && Files.exists(outputFile)) { - log.debug("File {} already exists", outputFile); - return outputFile; - } + public Path execute() throws IOException { + return assemble() + .block(); + } - final HttpGet getReq = get(); - return client.execute(getReq, new SaveToFileHandler(outputFile, false)); + public Mono assemble() throws IOException { + return usePreparedFetch(sharedFetch -> + sharedFetch.getReactiveClient() + .followRedirect(true) + .head() + .uri(uri()) + .response() + .map(OutputToDirectoryFetchBuilder::extractFilename) + .map(outputDirectory::resolve) + .flatMap(outputFile -> { + if (skipExisting && Files.exists(outputFile)) { + log.debug("File {} already exists", outputFile); + statusHandler.call(FileDownloadStatus.SKIP_FILE_EXISTS, uri(), outputFile); + return Mono.just(outputFile); + } - }); + return sharedFetch.getReactiveClient() + .doOnRequest((httpClientRequest, connection) -> { + statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), null); + }) + .followRedirect(true) + .get() + .uri(uri()) + .responseContent().aggregate() + .asInputStream() + .publishOn(Schedulers.boundedElastic()) + .flatMap((Function>) inputStream -> { + try { + final long size = Files.copy(inputStream, outputFile, StandardCopyOption.REPLACE_EXISTING); + statusHandler.call(FileDownloadStatus.DOWNLOADED, uri(), outputFile); + downloadedHandler.call(uri(), outputFile, size); + return Mono.just(outputFile); + } catch (IOException e) { + return Mono.error(e); + } + }); + }) + ); } + + private static String extractFilename(HttpClientResponse resp) { + final String contentDisposition = resp.responseHeaders().get(HttpHeaderNames.CONTENT_DISPOSITION); + final String dispositionFilename = FilenameExtractor.filenameFromContentDisposition(contentDisposition); + if (dispositionFilename != null) { + return dispositionFilename; + } + if (resp.redirectedFrom().length > 0) { + final String lastUrl = resp.redirectedFrom()[resp.redirectedFrom().length - 1]; + final int pos = lastUrl.lastIndexOf('/'); + return lastUrl.substring(pos + 1); + } + final int pos = resp.path().lastIndexOf('/'); + return resp.path().substring(pos + 1); + } + } diff --git a/src/main/java/me/itzg/helpers/http/SharedFetch.java b/src/main/java/me/itzg/helpers/http/SharedFetch.java index 98653f1d..b513e06f 100644 --- a/src/main/java/me/itzg/helpers/http/SharedFetch.java +++ b/src/main/java/me/itzg/helpers/http/SharedFetch.java @@ -1,5 +1,6 @@ package me.itzg.helpers.http; +import io.netty.handler.codec.http.HttpHeaderNames; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -13,6 +14,7 @@ import me.itzg.helpers.get.ExtendedRequestRetryStrategy; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.HttpClients; +import reactor.netty.http.client.HttpClient; /** * Provides an efficient way to make multiple web requests since a single client @@ -25,6 +27,7 @@ @Slf4j public class SharedFetch implements AutoCloseable { + @Getter private final CloseableHttpClient client; @Getter @@ -32,6 +35,9 @@ public class SharedFetch implements AutoCloseable { @Getter final LatchingUrisInterceptor latchingUrisInterceptor = new LatchingUrisInterceptor(); + @Getter + private final HttpClient reactiveClient; + public SharedFetch(String forCommand) { this(forCommand, 5, 2); } @@ -43,6 +49,11 @@ public SharedFetch(String forCommand, int retryCount, int retryDelaySeconds) { forCommand != null ? forCommand : "unspecified" ); + reactiveClient = HttpClient.create() + .headers(headers -> + headers.set(HttpHeaderNames.USER_AGENT.toString(), userAgent) + ); + this.client = HttpClients.custom() .addRequestInterceptorFirst((request, entity, context) -> { try { @@ -73,10 +84,6 @@ public SharedFetch addHeader(String name, String value) { return this; } - CloseableHttpClient getClient() { - return this.client; - } - @Override public void close() throws IOException { client.close(); diff --git a/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java b/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java index 45d6d935..d81462d4 100644 --- a/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java +++ b/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java @@ -1,25 +1,36 @@ package me.itzg.helpers.http; +import static io.netty.handler.codec.http.HttpHeaderNames.*; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED; +import static java.util.Objects.requireNonNull; + import java.io.IOException; +import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.nio.file.attribute.FileTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.List; import lombok.extern.slf4j.Slf4j; -import org.apache.hc.core5.http.HttpHeaders; -import org.apache.hc.core5.http.io.HttpClientResponseHandler; -import org.apache.hc.core5.http.message.BasicHttpRequest; +import me.itzg.helpers.errors.GenericException; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; @Slf4j public class SpecificFileFetchBuilder extends FetchBuilderBase { + private final static DateTimeFormatter httpDateTimeFormatter = DateTimeFormatter.RFC_1123_DATE_TIME.withZone(ZoneId.of("GMT")); + private FileDownloadStatusHandler statusHandler = (status, uri, p) -> { + }; + private FileDownloadedHandler downloadedHandler = (uri, p, contentSizeBytes) -> { + }; + private final Path file; private boolean skipUpToDate; - private boolean logProgressEach = false; - private HttpClientResponseHandler handler; private boolean skipExisting; public SpecificFileFetchBuilder(State state, Path file) { @@ -37,38 +48,102 @@ public SpecificFileFetchBuilder skipExisting(boolean skipExisting) { return self(); } - public SpecificFileFetchBuilder logProgressEach(boolean logProgressEach) { - this.logProgressEach = logProgressEach; + public SpecificFileFetchBuilder handleStatus(FileDownloadStatusHandler handler) { + requireNonNull(handler); + this.statusHandler = handler; + return self(); + } + + public SpecificFileFetchBuilder handleDownloaded(FileDownloadedHandler handler) { + requireNonNull(handler); + this.downloadedHandler = handler; return self(); } public Path execute() throws IOException { + return assemble() + .block(); + } + + public Mono assemble() throws IOException { + final URI uri = uri(); + if (skipExisting && Files.exists(file)) { log.debug("File already exists and skip requested"); - return file; + statusHandler.call(FileDownloadStatus.SKIP_FILE_EXISTS, uri, file); + return Mono.just(file); } - return useClient(client -> client.execute(get(), handler)); - } + final boolean useIfModifiedSince = skipUpToDate && Files.exists(file); - @Override - protected void configureRequest(BasicHttpRequest request) throws IOException { - super.configureRequest(request); + return usePreparedFetch(sharedFetch -> + sharedFetch.getReactiveClient() + .doOnRequest((httpClientRequest, connection) -> + statusHandler.call(FileDownloadStatus.DOWNLOADING, uri, file) + ) + .headers(headers -> { + if (useIfModifiedSince) { + try { + final FileTime lastModifiedTime; + lastModifiedTime = Files.getLastModifiedTime(file); + headers.set( + IF_MODIFIED_SINCE.toString(), + httpDateTimeFormatter.format(lastModifiedTime.toInstant()) + ); + } catch (IOException e) { + throw new GenericException("Unable to get last modified time of " + file, e); + } - final SaveToFileHandler handler = new SaveToFileHandler(file, logProgressEach); - handler.setExpectedContentTypes(getAcceptContentTypes()); + } + final List contentTypes = getAcceptContentTypes(); + if (contentTypes != null && !contentTypes.isEmpty()) { + headers.set( + ACCEPT.toString(), + contentTypes + ); + } + }) + .followRedirect(true) + .get() + .uri(uri) + .responseSingle((httpClientResponse, bodyMono) -> { + if (useIfModifiedSince + && httpClientResponse.status().equals(NOT_MODIFIED)) { + return Mono.just(file); + } - if (skipUpToDate && Files.exists(file)) { - final FileTime lastModifiedTime = Files.getLastModifiedTime(file); - request.addHeader(HttpHeaders.IF_MODIFIED_SINCE, - httpDateTimeFormatter.format(lastModifiedTime.toInstant()) - ); + final List contentTypes = getAcceptContentTypes(); + if (contentTypes != null && !contentTypes.isEmpty()) { + final List respTypes = httpClientResponse.responseHeaders() + .getAll(CONTENT_TYPE); + if (respTypes.stream() + .noneMatch(contentTypes::contains)) { + return Mono.error(new GenericException("Unexpected content type in response")); + } + } - // wrap the handler to intercept the NotModified response - this.handler = new NotModifiedHandler(file, handler, logProgressEach); - } - else { - this.handler = handler; - } + return bodyMono.asInputStream() + .publishOn(Schedulers.boundedElastic()) + .flatMap(inputStream -> { + try { + @SuppressWarnings("BlockingMethodInNonBlockingContext") // see publishOn above + final long size = Files.copy(inputStream, file, StandardCopyOption.REPLACE_EXISTING); + statusHandler.call(FileDownloadStatus.DOWNLOADED, uri, file); + downloadedHandler.call(uri, file, size); + } catch (IOException e) { + return Mono.error(e); + }/* finally { + try { + //noinspection BlockingMethodInNonBlockingContext + inputStream.close(); + } catch (IOException e) { + log.warn("Unable to close body input stream", e); + } + }*/ + return Mono.just(file); + }); + }) + ); } + }