Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public class InstallFabricLoaderCommand implements Callable<Integer> {
@Option(names = "--results-file", description = ResultsFileWriter.OPTION_DESCRIPTION, paramLabel = "FILE")
Path resultsFile;

@Option(names = "--force-reinstall", description = "Force reinstall of the loader even if it already exists")
boolean forceReinstall;

@ArgGroup
OriginOptions originOptions = new OriginOptions();

Expand Down Expand Up @@ -85,7 +88,8 @@ else if (VERSION_PATTERN.matcher(value).matches()) {
@Override
public Integer call() throws Exception {
final FabricLauncherInstaller installer = new FabricLauncherInstaller(outputDirectory)
.setResultsFile(resultsFile);
.setResultsFile(resultsFile)
.setForceReinstall(forceReinstall);

if (originOptions.fromUri != null) {
installer.installUsingUri(sharedFetchArgs.options(), originOptions.fromUri);
Expand Down
49 changes: 27 additions & 22 deletions src/main/java/me/itzg/helpers/files/ReactiveFileUtils.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package me.itzg.helpers.files;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Instant;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.netty.ByteBufFlux;
Expand Down Expand Up @@ -39,34 +41,37 @@ public static Mono<Path> createDirectories(Path dir) {
.subscribeOn(Schedulers.boundedElastic());
}

@SuppressWarnings("BlockingMethodInNonBlockingContext")
public static Mono<Long> copyByteBufFluxToFile(ByteBufFlux byteBufFlux, Path file) {
public static Mono<Long> writeByteBufFluxToFile(ByteBufFlux byteBufFlux, Path file) {
return Mono.fromCallable(() -> {
log.trace("Opening {} for writing", file);
return Files.newByteChannel(file, StandardOpenOption.WRITE, StandardOpenOption.CREATE,
return FileChannel.open(file,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING
);
}
)
.flatMap(outChannel -> byteBufFlux.asByteBuffer()
.flatMap(byteBuffer ->
Mono.fromCallable(() -> {
final int count = outChannel.write(byteBuffer);
log.trace("Wrote {} bytes to {}", count, file);
return count;
}
)
)
.doOnTerminate(() -> {
try {
log.trace("Closing file for writing: {}", file);
outChannel.close();
} catch (IOException e) {
log.error("Failed to close file for writing: {}", file, e);
}
})
.collect(Collectors.<Integer>summingLong(value -> value))
.subscribeOn(Schedulers.boundedElastic())
.subscribeOn(Schedulers.boundedElastic())
.flatMap(outChannel ->
byteBufFlux
.asByteBuffer()
.subscribeOn(Schedulers.boundedElastic())
.<Integer>handle((byteBuffer, sink) -> {
try {
sink.next(outChannel.write(byteBuffer));
} catch (IOException e) {
sink.error(Exceptions.propagate(e));
}
})
.doOnTerminate(() -> {
try {
outChannel.close();
log.trace("Closed {}", file);
} catch (IOException e) {
log.warn("Failed to close {}", file, e);
}
})
.collect(Collectors.<Integer>summingLong(value -> value))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,17 @@
import static io.netty.handler.codec.http.HttpHeaderNames.LAST_MODIFIED;
import static java.util.Objects.requireNonNull;

import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;

import org.jetbrains.annotations.NotNull;

import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import me.itzg.helpers.files.ReactiveFileUtils;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.netty.ByteBufFlux;
Expand Down Expand Up @@ -244,7 +242,7 @@ else if (skipUpToDate) {
private Mono<Path> copyBodyInputStreamToFile(ByteBufFlux byteBufFlux, Path outputFile) {
log.trace("Copying response body to file={}", outputFile);

return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, outputFile)
return ReactiveFileUtils.writeByteBufFluxToFile(byteBufFlux, outputFile)
.map(fileSize -> {
statusHandler.call(FileDownloadStatus.DOWNLOADED, uri(), outputFile);
downloadedHandler.call(uri(), outputFile, fileSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Mono<Path> assemble() {
return failedContentTypeMono(resp);
}

return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, file)
return ReactiveFileUtils.writeByteBufFluxToFile(byteBufFlux, file)
.flatMap(fileSize -> {
statusHandler.call(FileDownloadStatus.DOWNLOADED, uri, file);
downloadedHandler.call(uri, file, fileSize);
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ private Mono<Path> processRemoteSource(String source) {
.toDirectory(dest)
.skipUpToDate(skipUpToDate)
.skipExisting(skipExisting)
.handleDownloaded((downloaded, uri, size) -> {
log.debug("Downloaded {} from {} ({} bytes)", downloaded, uri, size);
})
.handleStatus((status, uri, file) -> {
switch (status) {
case DOWNLOADING:
Expand All @@ -240,7 +243,6 @@ private Mono<Path> processRemoteSource(String source) {
log.info("The file {} already exists", file);
break;
case DOWNLOADED:
log.debug("Finished downloading to file={}", file);
break;
}
})
Expand Down