Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .run/modrinth-modpack (slug).run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@
<option name="ENABLED" value="true" />
</pattern>
</extension>
<extension name="net.ashald.envfile">
<option name="IS_ENABLED" value="false" />
<option name="IS_SUBST" value="false" />
<option name="IS_PATH_MACRO_SUPPORTED" value="false" />
<option name="IS_IGNORE_MISSING_FILES" value="false" />
<option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
<ENTRIES>
<ENTRY IS_ENABLED="true" PARSER="runconfig" IS_EXECUTABLE="false" />
</ENTRIES>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ else if (Manifests.allFilesPresent(outputDir, context.prevInstallManifest, ignor
}
}
else {
log.info("Downloading modpack zip for {}", modFile.getDisplayName());

modpackZip = context.cfApi.downloadTemp(modFile, ".zip",
(status, uri, file) ->
log.debug("Modpack file retrieval: status={} uri={} file={}", status, uri, file)
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/me/itzg/helpers/fabric/FabricMetaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private static void debugDownloadedContent(Path path) {
log.debug("Downloaded launcher jar content starts with: {}",
Hex.encodeHexString(ByteBuffer.wrap(buf, 0, amount))
);
} catch (IOException e) {
} catch (IOException|IndexOutOfBoundsException e) {
log.warn("Failed to debug content of launcher jar", e);
}
}
Expand Down
53 changes: 53 additions & 0 deletions src/main/java/me/itzg/helpers/files/ByteBufQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package me.itzg.helpers.files;

import io.netty.buffer.ByteBuf;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ByteBufQueue {

final Lock lock = new ReentrantLock();
final Condition readyOrFinished = lock.newCondition();
final LinkedList<ByteBuf> buffers = new LinkedList<>();
boolean finished = false;

public void add(ByteBuf buf) {
lock.lock();
try {
buffers.add(buf);
} finally {
readyOrFinished.signal();
lock.unlock();
}
}

public ByteBuf take() {
while (true) {
lock.lock();

try {
if (!buffers.isEmpty()) {
return buffers.removeFirst();
}
else if (finished) {
return null;
}
readyOrFinished.awaitUninterruptibly();
} finally {
lock.unlock();
}
}
}

public void finish() {
lock.lock();
try {
finished = true;
readyOrFinished.signal();
} finally {
lock.unlock();
}
}
}
68 changes: 39 additions & 29 deletions src/main/java/me/itzg/helpers/files/ReactiveFileUtils.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package me.itzg.helpers.files;

import java.io.IOException;
import io.netty.buffer.ByteBuf;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Instant;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.netty.ByteBufFlux;
import reactor.util.function.Tuple2;

@Slf4j
public class ReactiveFileUtils {
Expand Down Expand Up @@ -42,36 +42,46 @@ public static Mono<Path> createDirectories(Path dir) {
}

public static Mono<Long> writeByteBufFluxToFile(ByteBufFlux byteBufFlux, Path file) {
return Mono.fromCallable(() -> {
log.trace("Opening {} for writing", file);
return FileChannel.open(file,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING
);
}
)
.subscribeOn(Schedulers.boundedElastic())
.flatMap(outChannel ->
byteBufFlux
.asByteBuffer()
.subscribeOn(Schedulers.boundedElastic())
.<Integer>handle((byteBuffer, sink) -> {
try {
sink.next(outChannel.write(byteBuffer));
} catch (IOException e) {
sink.error(Exceptions.propagate(e));
final ByteBufQueue byteBufQueue = new ByteBufQueue();

// Separate this into a pair of concurrent mono's
return Mono.zip(
// ...file writer
Mono.fromCallable(() -> {
try (FileChannel channel = FileChannel.open(file,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING
)) {
ByteBuf byteBuf;
while ((byteBuf = byteBufQueue.take()) != null) {
try {
//noinspection ResultOfMethodCallIgnored
channel.write(byteBuf.nioBuffer());
} finally {
byteBuf.release();
}
}

return file;
}
})
.doOnTerminate(() -> {
try {
outChannel.close();
log.trace("Closed {}", file);
} catch (IOException e) {
log.warn("Failed to close {}", file, e);
}
// ...which runs in a separate thread
.subscribeOn(Schedulers.boundedElastic()),
// ...and the network consumer flux
byteBufFlux
// Mark the bytebufs as retained so they can be released after
// they are written by the mono above
.retain()
.map(byteBuf -> {
final int amount = byteBuf.readableBytes();
byteBufQueue.add(byteBuf);
return amount;
})
.doOnTerminate(byteBufQueue::finish)
.collect(Collectors.<Integer>summingLong(value -> value))
);
)
// Just expose the total bytes read from network
.map(Tuple2::getT2);
}
}
40 changes: 28 additions & 12 deletions src/main/java/me/itzg/helpers/http/SharedFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import lombok.extern.slf4j.Slf4j;
import me.itzg.helpers.McImageHelper;
import me.itzg.helpers.errors.GenericException;
import reactor.netty.http.Http11SslContextSpec;
import reactor.netty.http.Http2SslContextSpec;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.HttpClient;
Expand Down Expand Up @@ -60,8 +61,6 @@ public SharedFetch(String forCommand, Options options) {

reactiveClient = HttpClient.create(connectionProvider)
.proxyWithSystemProperties()
// https://projectreactor.io/docs/netty/release/reference/http-client.html#HTTP2
.protocol(HttpProtocol.HTTP11, HttpProtocol.H2)
.headers(headers -> {
headers
.set(HttpHeaderNames.USER_AGENT.toString(), userAgent)
Expand All @@ -72,15 +71,29 @@ public SharedFetch(String forCommand, Options options) {
}
)
// Reference https://projectreactor.io/docs/netty/release/reference/index.html#response-timeout
.responseTimeout(options.getResponseTimeout())
.secure(spec ->
// Http2 SSL supports both HTTP/2 and HTTP/1.1
spec.sslContext((GenericSslContextSpec<?>) Http2SslContextSpec.forClient())
// Reference https://projectreactor.io/docs/netty/release/reference/index.html#ssl-tls-timeout
.handshakeTimeout(options.getTlsHandshakeTimeout())
)

;
.responseTimeout(options.getResponseTimeout());

if (options.isUseHttp2()) {
log.debug("Using HTTP/2");
reactiveClient
// https://projectreactor.io/docs/netty/release/reference/http-client.html#HTTP2
.protocol(HttpProtocol.HTTP11, HttpProtocol.H2)
.secure(spec ->
// Http2 SSL supports both HTTP/2 and HTTP/1.1
spec.sslContext((GenericSslContextSpec<?>) Http2SslContextSpec.forClient())
// Reference https://projectreactor.io/docs/netty/release/reference/index.html#ssl-tls-timeout
.handshakeTimeout(options.getTlsHandshakeTimeout())
);
}
else {
log.debug("Using HTTP/1.1");
reactiveClient
.secure(spec ->
spec.sslContext((GenericSslContextSpec<?>) Http11SslContextSpec.forClient())
// Reference https://projectreactor.io/docs/netty/release/reference/index.html#ssl-tls-timeout
.handshakeTimeout(options.getTlsHandshakeTimeout())
);
}

headers.put("x-fetch-session", fetchSessionId);

Expand Down Expand Up @@ -135,14 +148,17 @@ public static class Options {
*/
private final URI filesViaUrl;

@Default
private final boolean useHttp2 = true;

public Options withHeader(String key, String value) {
final Map<String, String> newHeaders = extraHeaders != null ?
new HashMap<>(extraHeaders) : new HashMap<>();
newHeaders.put(key, value);

return new Options(
responseTimeout, tlsHandshakeTimeout, maxIdleTimeout, pendingAcquireTimeout,
newHeaders, filesViaUrl
newHeaders, filesViaUrl, useHttp2
);
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/me/itzg/helpers/http/SharedFetchArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public void setPendingAcquireTimeout(Duration timeout) {
optionsBuilder.pendingAcquireTimeout(timeout);
}

@Option(names = "--use-http2", defaultValue = "${env:FETCH_USE_HTTP2:-true}",
description = "Whether to use HTTP/2. Default: ${DEFAULT-VALUE}"
)
public void setUseHttp2(boolean useHttp2) {
optionsBuilder.useHttp2(useHttp2);
}

public Options options() {
return optionsBuilder.build();
}
Expand Down