Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 149 additions & 110 deletions src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@
import static io.netty.handler.codec.http.HttpHeaderNames.LAST_MODIFIED;
import static java.util.Objects.requireNonNull;

import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;

import org.jetbrains.annotations.NotNull;

import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import me.itzg.helpers.files.ReactiveFileUtils;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.netty.ByteBufFlux;
Expand All @@ -33,6 +35,9 @@ public class OutputToDirectoryFetchBuilder extends FetchBuilderBase<OutputToDire
@Setter
private boolean skipUpToDate;

@Setter
private boolean useRemoteName;

private FileDownloadStatusHandler statusHandler = (status, uri, file) -> {
};
private FileDownloadedHandler downloadedHandler = (uri, file, contentSizeBytes) -> {
Expand Down Expand Up @@ -63,12 +68,15 @@ public OutputToDirectoryFetchBuilder handleDownloaded(FileDownloadedHandler down

public Path execute() throws IOException {
return assemble()
.block();
.block();
}

public Mono<Path> assemble() {
return useReactiveClient(client ->
client
if (useRemoteName) {
return assembleWithRemoteName();
}

return useReactiveClient(client -> client
.headers(this::applyHeaders)
.followRedirect(true)
.doOnRequest(debugLogRequest(log, "file head fetch"))
Expand All @@ -87,35 +95,74 @@ public Mono<Path> assemble() {
final Instant lastModified = respLastModified(resp);

return assembleFileDownload(client, outputFile,
lastModified,
resp.resourceUrl()
);
lastModified,
resp.resourceUrl());
})
.subscribeOn(Schedulers.boundedElastic())
.checkpoint("Fetch HEAD of requested file")
);
.checkpoint("Fetch HEAD of requested file"));
}

private Mono<Path> assembleWithRemoteName() {
return useReactiveClient(client -> {
final String path = uri().getPath();
final int pos = path.lastIndexOf('/');
String filename = path.substring(pos + 1);

// Strip query parameters and fragments from the filename
int queryPos = filename.indexOf('?');
if (queryPos > 0) {
filename = filename.substring(0, queryPos);
}

int fragmentPos = filename.indexOf('#');
if (fragmentPos > 0) {
filename = filename.substring(0, fragmentPos);
}

final Path outputFile = outputDirectory.resolve(filename);

return client
.headers(this::applyHeaders)
.followRedirect(true)
.doOnRequest(debugLogRequest(log, "file get fetch"))
.get()
.uri(uri())
.response((resp, byteBufFlux) -> {
if (notSuccess(resp)) {
return failedRequestMono(resp, byteBufFlux.aggregate(), "Getting file");
}

statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile);

return skipExisting(resp, outputFile)
.flatMap(skip -> skip ? Mono.just(outputFile)
: copyBodyInputStreamToFile(byteBufFlux, outputFile));
})
.last()
.subscribeOn(Schedulers.boundedElastic())
.checkpoint("Direct file download with remote name");
});
}

private Mono<Path> assembleFileDownloadNameViaGet(HttpClient client) {
return client
.followRedirect(true)
.doOnRequest(debugLogRequest(log, "file get fetch"))
.get()
.uri(uri())
.response((resp, byteBufFlux) -> {
if (notSuccess(resp)) {
return failedRequestMono(resp, byteBufFlux.aggregate(), "Getting file");
}

final Path outputFile = outputDirectory.resolve(extractFilename(resp));
statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile);

return skipExisting(resp, outputFile)
.flatMap(skip -> skip ? Mono.just(outputFile)
: copyBodyInputStreamToFile(byteBufFlux, outputFile)
);
})
.last();
.followRedirect(true)
.doOnRequest(debugLogRequest(log, "file get fetch"))
.get()
.uri(uri())
.response((resp, byteBufFlux) -> {
if (notSuccess(resp)) {
return failedRequestMono(resp, byteBufFlux.aggregate(), "Getting file");
}

final Path outputFile = outputDirectory.resolve(extractFilename(resp));
statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile);

return skipExisting(resp, outputFile)
.flatMap(skip -> skip ? Mono.just(outputFile)
: copyBodyInputStreamToFile(byteBufFlux, outputFile));
})
.last();
}

private Instant respLastModified(HttpClientResponse resp) {
Expand All @@ -129,125 +176,117 @@ private Instant respLastModified(HttpClientResponse resp) {
*/
private Mono<Boolean> skipExisting(HttpClientResponse resp, @NotNull Path outputFile) {
return ReactiveFileUtils.fileExists(outputFile)
.flatMap(exists -> {
if (!exists) {
return Mono.just(false);
}

if (skipExisting && !skipUpToDate) {
log.debug("The file {} already exists", outputFile);
statusHandler.call(FileDownloadStatus.SKIP_FILE_EXISTS, uri(), outputFile);
return Mono.just(true);
}
else if (skipUpToDate) {

return ReactiveFileUtils.getLastModifiedTime(outputFile)
.mapNotNull(outputLastModified -> {
final Instant headerLastModified = respLastModified(resp);
if (isFileUpToDate(outputLastModified, headerLastModified)) {
log.debug("The file={} lastModified={} is already up to date compared to response={}",
outputFile, outputLastModified, headerLastModified
);
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile);
return true;
}
.flatMap(exists -> {
if (!exists) {
return Mono.just(false);
}

return false;
});
}
if (skipExisting && !skipUpToDate) {
log.debug("The file {} already exists", outputFile);
statusHandler.call(FileDownloadStatus.SKIP_FILE_EXISTS, uri(), outputFile);
return Mono.just(true);
} else if (skipUpToDate) {

return ReactiveFileUtils.getLastModifiedTime(outputFile)
.mapNotNull(outputLastModified -> {
final Instant headerLastModified = respLastModified(resp);
if (isFileUpToDate(outputLastModified, headerLastModified)) {
log.debug(
"The file={} lastModified={} is already up to date compared to response={}",
outputFile, outputLastModified, headerLastModified);
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile);
return true;
}

return false;
});
}

return Mono.just(false);
});
return Mono.just(false);
});

}

private static boolean isFileUpToDate(Instant fileLastModified, Instant headerLastModified) {
return headerLastModified != null && headerLastModified.compareTo(fileLastModified) <= 0;
}

private Mono<Path> assembleFileDownload(HttpClient client, Path outputFile, Instant headerLastModified, String resourceUrl) {
private Mono<Path> assembleFileDownload(HttpClient client, Path outputFile, Instant headerLastModified,
String resourceUrl) {
final Mono<Instant> fileLastModifiedMono = ReactiveFileUtils.getLastModifiedTime(outputFile);

final Mono<Boolean> alreadyUpToDateMono;
if (skipExisting && !skipUpToDate) {

alreadyUpToDateMono = ReactiveFileUtils.fileExists(outputFile)
.doOnNext(exists -> {
if (exists) {
log.debug("The file {} already exists", outputFile);
statusHandler.call(FileDownloadStatus.SKIP_FILE_EXISTS, uri(), outputFile);
}
});
}
else if (skipUpToDate) {
.doOnNext(exists -> {
if (exists) {
log.debug("The file {} already exists", outputFile);
statusHandler.call(FileDownloadStatus.SKIP_FILE_EXISTS, uri(), outputFile);
}
});
} else if (skipUpToDate) {

alreadyUpToDateMono =
fileLastModifiedMono
alreadyUpToDateMono = fileLastModifiedMono
.map(fileLastModified -> {
final boolean fileUpToDate = isFileUpToDate(fileLastModified, headerLastModified);
if (fileUpToDate) {
log.debug("The file={} lastModified={} is already up to date compared to response={}",
outputFile, fileLastModified, headerLastModified
);
outputFile, fileLastModified, headerLastModified);
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile);
}
return fileUpToDate;
})
.defaultIfEmpty(false);
}
else {
} else {

alreadyUpToDateMono = Mono.just(false);
}

return alreadyUpToDateMono
.filter(alreadyUpToDate -> !alreadyUpToDate)
.flatMap(notUsed -> client
.headers(this::applyHeaders)
.headersWhen(headers ->
skipUpToDate ?
fileLastModifiedMono
.map(outputLastModified -> headers.set(
IF_MODIFIED_SINCE,
httpDateTimeFormatter.format(outputLastModified)
))
.defaultIfEmpty(headers)
: Mono.just(headers)
)
.followRedirect(true)
.doOnRequest(debugLogRequest(log, "file fetch"))
.doOnRequest(
(httpClientRequest, connection) -> statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile))
.get()
.uri(resourceUrl)
.response((resp, byteBufFlux) -> {
if (skipUpToDate && resp.status() == HttpResponseStatus.NOT_MODIFIED) {
log.debug("The file {} is already up to date", outputFile);
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile);
return Mono.just(outputFile);
}
.filter(alreadyUpToDate -> !alreadyUpToDate)
.flatMap(notUsed -> client
.headers(this::applyHeaders)
.headersWhen(headers -> skipUpToDate ? fileLastModifiedMono
.map(outputLastModified -> headers.set(
IF_MODIFIED_SINCE,
httpDateTimeFormatter.format(outputLastModified)))
.defaultIfEmpty(headers)
: Mono.just(headers))
.followRedirect(true)
.doOnRequest(debugLogRequest(log, "file fetch"))
.doOnRequest(
(httpClientRequest, connection) -> statusHandler.call(FileDownloadStatus.DOWNLOADING,
uri(), outputFile))
.get()
.uri(resourceUrl)
.response((resp, byteBufFlux) -> {
if (skipUpToDate && resp.status() == HttpResponseStatus.NOT_MODIFIED) {
log.debug("The file {} is already up to date", outputFile);
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile);
return Mono.just(outputFile);
}

if (notSuccess(resp)) {
return failedRequestMono(resp, byteBufFlux.aggregate(), "Downloading file");
}
if (notSuccess(resp)) {
return failedRequestMono(resp, byteBufFlux.aggregate(), "Downloading file");
}

return copyBodyInputStreamToFile(byteBufFlux, outputFile);
})
.last()
.checkpoint("Fetching file into directory")
)
.defaultIfEmpty(outputFile);
return copyBodyInputStreamToFile(byteBufFlux, outputFile);
})
.last()
.checkpoint("Fetching file into directory"))
.defaultIfEmpty(outputFile);
}

private Mono<Path> copyBodyInputStreamToFile(ByteBufFlux byteBufFlux, Path outputFile) {
log.trace("Copying response body to file={}", outputFile);

return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, outputFile)
.map(fileSize -> {
statusHandler.call(FileDownloadStatus.DOWNLOADED, uri(), outputFile);
downloadedHandler.call(uri(), outputFile, fileSize);
return outputFile;
});
.map(fileSize -> {
statusHandler.call(FileDownloadStatus.DOWNLOADED, uri(), outputFile);
downloadedHandler.call(uri(), outputFile, fileSize);
return outputFile;
});
}

private String extractFilename(HttpClientResponse resp) {
Expand Down
Loading