From f1814abf830b86f2ee0590f490bac03fc22aa7ca Mon Sep 17 00:00:00 2001 From: Geoff Bourne Date: Sat, 17 May 2025 20:30:37 -0500 Subject: [PATCH 1/2] http: adjust ReactiveFileUtils.writeByteBufFluxToFile --- .../itzg/helpers/files/ReactiveFileUtils.java | 49 ++++++++++--------- .../http/OutputToDirectoryFetchBuilder.java | 10 ++-- .../http/SpecificFileFetchBuilder.java | 2 +- .../itzg/helpers/sync/MulitCopyCommand.java | 4 +- 4 files changed, 35 insertions(+), 30 deletions(-) diff --git a/src/main/java/me/itzg/helpers/files/ReactiveFileUtils.java b/src/main/java/me/itzg/helpers/files/ReactiveFileUtils.java index b8e1c78b..7b2c5783 100644 --- a/src/main/java/me/itzg/helpers/files/ReactiveFileUtils.java +++ b/src/main/java/me/itzg/helpers/files/ReactiveFileUtils.java @@ -1,12 +1,14 @@ package me.itzg.helpers.files; import java.io.IOException; +import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.time.Instant; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import reactor.core.Exceptions; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.netty.ByteBufFlux; @@ -39,34 +41,37 @@ public static Mono createDirectories(Path dir) { .subscribeOn(Schedulers.boundedElastic()); } - @SuppressWarnings("BlockingMethodInNonBlockingContext") - public static Mono copyByteBufFluxToFile(ByteBufFlux byteBufFlux, Path file) { + public static Mono writeByteBufFluxToFile(ByteBufFlux byteBufFlux, Path file) { return Mono.fromCallable(() -> { log.trace("Opening {} for writing", file); - return Files.newByteChannel(file, StandardOpenOption.WRITE, StandardOpenOption.CREATE, + return FileChannel.open(file, + StandardOpenOption.WRITE, + StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING ); } ) - .flatMap(outChannel -> byteBufFlux.asByteBuffer() - .flatMap(byteBuffer -> - Mono.fromCallable(() -> { - final int count = outChannel.write(byteBuffer); - log.trace("Wrote {} bytes to {}", count, file); - return count; - } - ) - ) - .doOnTerminate(() -> { - try { - log.trace("Closing file for writing: {}", file); - outChannel.close(); - } catch (IOException e) { - log.error("Failed to close file for writing: {}", file, e); - } - }) - .collect(Collectors.summingLong(value -> value)) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(Schedulers.boundedElastic()) + .flatMap(outChannel -> + byteBufFlux + .asByteBuffer() + .subscribeOn(Schedulers.boundedElastic()) + .handle((byteBuffer, sink) -> { + try { + sink.next(outChannel.write(byteBuffer)); + } catch (IOException e) { + sink.error(Exceptions.propagate(e)); + } + }) + .doOnTerminate(() -> { + try { + outChannel.close(); + log.trace("Closed {}", file); + } catch (IOException e) { + log.warn("Failed to close {}", file, e); + } + }) + .collect(Collectors.summingLong(value -> value)) ); } } diff --git a/src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java b/src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java index bd815089..76886d28 100644 --- a/src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java +++ b/src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java @@ -4,19 +4,17 @@ import static io.netty.handler.codec.http.HttpHeaderNames.LAST_MODIFIED; import static java.util.Objects.requireNonNull; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponseStatus; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; - -import org.jetbrains.annotations.NotNull; - -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpResponseStatus; import lombok.Setter; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import me.itzg.helpers.files.ReactiveFileUtils; +import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.netty.ByteBufFlux; @@ -244,7 +242,7 @@ else if (skipUpToDate) { private Mono copyBodyInputStreamToFile(ByteBufFlux byteBufFlux, Path outputFile) { log.trace("Copying response body to file={}", outputFile); - return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, outputFile) + return ReactiveFileUtils.writeByteBufFluxToFile(byteBufFlux, outputFile) .map(fileSize -> { statusHandler.call(FileDownloadStatus.DOWNLOADED, uri(), outputFile); downloadedHandler.call(uri(), outputFile, fileSize); diff --git a/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java b/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java index 000aa411..4fcc077e 100644 --- a/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java +++ b/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java @@ -107,7 +107,7 @@ public Mono assemble() { return failedContentTypeMono(resp); } - return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, file) + return ReactiveFileUtils.writeByteBufFluxToFile(byteBufFlux, file) .flatMap(fileSize -> { statusHandler.call(FileDownloadStatus.DOWNLOADED, uri, file); downloadedHandler.call(uri, file, fileSize); diff --git a/src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java b/src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java index 55f2905c..97c98d30 100644 --- a/src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java +++ b/src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java @@ -228,6 +228,9 @@ private Mono processRemoteSource(String source) { .toDirectory(dest) .skipUpToDate(skipUpToDate) .skipExisting(skipExisting) + .handleDownloaded((downloaded, uri, size) -> { + log.debug("Downloaded {} from {} ({} bytes)", downloaded, uri, size); + }) .handleStatus((status, uri, file) -> { switch (status) { case DOWNLOADING: @@ -240,7 +243,6 @@ private Mono processRemoteSource(String source) { log.info("The file {} already exists", file); break; case DOWNLOADED: - log.debug("Finished downloading to file={}", file); break; } }) From fef326b2c5a1d13e9785ca51fb303e8f221e2238 Mon Sep 17 00:00:00 2001 From: Geoff Bourne Date: Mon, 26 May 2025 16:46:51 -0500 Subject: [PATCH 2/2] fabric: added --force-reinstall option --- .../me/itzg/helpers/fabric/InstallFabricLoaderCommand.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/me/itzg/helpers/fabric/InstallFabricLoaderCommand.java b/src/main/java/me/itzg/helpers/fabric/InstallFabricLoaderCommand.java index eb9368e2..7a31a5a5 100644 --- a/src/main/java/me/itzg/helpers/fabric/InstallFabricLoaderCommand.java +++ b/src/main/java/me/itzg/helpers/fabric/InstallFabricLoaderCommand.java @@ -28,6 +28,9 @@ public class InstallFabricLoaderCommand implements Callable { @Option(names = "--results-file", description = ResultsFileWriter.OPTION_DESCRIPTION, paramLabel = "FILE") Path resultsFile; + @Option(names = "--force-reinstall", description = "Force reinstall of the loader even if it already exists") + boolean forceReinstall; + @ArgGroup OriginOptions originOptions = new OriginOptions(); @@ -85,7 +88,8 @@ else if (VERSION_PATTERN.matcher(value).matches()) { @Override public Integer call() throws Exception { final FabricLauncherInstaller installer = new FabricLauncherInstaller(outputDirectory) - .setResultsFile(resultsFile); + .setResultsFile(resultsFile) + .setForceReinstall(forceReinstall); if (originOptions.fromUri != null) { installer.installUsingUri(sharedFetchArgs.options(), originOptions.fromUri);