Skip to content

Commit

Permalink
Add quarkus.rest-client.multipart.max-chunk-size property
Browse files Browse the repository at this point in the history
  • Loading branch information
ejba committed Sep 20, 2023
1 parent 7903aa7 commit 3e39566
Show file tree
Hide file tree
Showing 23 changed files with 215 additions and 23 deletions.
Expand Up @@ -43,6 +43,8 @@ public class RestClientConfig {
EMPTY.connectionPoolSize = Optional.empty();
EMPTY.keepAliveEnabled = Optional.empty();
EMPTY.maxRedirects = Optional.empty();
EMPTY.multipart = new RestClientMultipartConfig();
EMPTY.multipart.maxChunkSize = Optional.empty();
EMPTY.headers = Collections.emptyMap();
EMPTY.shared = Optional.empty();
EMPTY.name = Optional.empty();
Expand All @@ -51,6 +53,8 @@ public class RestClientConfig {
EMPTY.alpn = Optional.empty();
}

public RestClientMultipartConfig multipart;

/**
* The base URL to use for this service. This property or the `uri` property is considered required, unless
* the `baseUri` attribute is configured in the `@RegisterRestClient` annotation.
Expand Down Expand Up @@ -296,6 +300,9 @@ public static RestClientConfig load(String configKey) {
instance.userAgent = getConfigValue(configKey, "user-agent", String.class);
instance.http2 = getConfigValue(configKey, "http2", Boolean.class);

instance.multipart = new RestClientMultipartConfig();
instance.multipart.maxChunkSize = getConfigValue(configKey, "multipart.max-chunk-size", Integer.class);

return instance;
}

Expand Down Expand Up @@ -333,6 +340,9 @@ public static RestClientConfig load(Class<?> interfaceClass) {
instance.http2 = getConfigValue(interfaceClass, "http2", Boolean.class);
instance.alpn = getConfigValue(interfaceClass, "alpn", Boolean.class);

instance.multipart = new RestClientMultipartConfig();
instance.multipart.maxChunkSize = getConfigValue(interfaceClass, "multipart.max-chunk-size", Integer.class);

return instance;
}

Expand Down
@@ -0,0 +1,19 @@
package io.quarkus.restclient.config;

import java.util.Optional;

import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.annotations.ConfigItem;

@ConfigGroup
public class RestClientMultipartConfig {

/**
* The max HTTP chunk size (8096 bytes by default).
*
* This property is applicable to reactive REST clients only.
*/
@ConfigItem
public Optional<Integer> maxChunkSize;

}
Expand Up @@ -102,6 +102,8 @@ public class RestClientsConfig {

public RestClientLoggingConfig logging;

public RestClientMultipartConfig multipart;

/**
* A timeout in milliseconds that REST clients should wait to connect to the remote endpoint.
*
Expand Down
Expand Up @@ -72,6 +72,7 @@ private void verifyConfig(RestClientConfig config) {
assertThat(config.connectionTTL.get()).isEqualTo(30000);
assertThat(config.connectionPoolSize).isPresent();
assertThat(config.connectionPoolSize.get()).isEqualTo(10);
assertThat(config.multipart.maxChunkSize.get()).isEqualTo(1024);
}

private static void setupMPConfig() throws IOException {
Expand Down
Expand Up @@ -10,6 +10,7 @@ quarkus.rest-client.test-client.query-param-style=COMMA_SEPARATED
quarkus.rest-client.test-client.hostname-verifier=io.quarkus.restclient.configuration.MyHostnameVerifier
quarkus.rest-client.test-client.connection-ttl=30000
quarkus.rest-client.test-client.connection-pool-size=10
quarkus.rest-client.test-client.multipart.max-chunk-size=1024

quarkus.rest-client."io.quarkus.restclient.config.RestClientConfigTest".url=http://localhost:8080
quarkus.rest-client."RestClientConfigTest".uri=http://localhost:8081
Expand All @@ -23,3 +24,4 @@ quarkus.rest-client."io.quarkus.restclient.config.RestClientConfigTest".query-pa
quarkus.rest-client."io.quarkus.restclient.config.RestClientConfigTest".hostname-verifier=io.quarkus.restclient.configuration.MyHostnameVerifier
quarkus.rest-client."io.quarkus.restclient.config.RestClientConfigTest".connection-ttl=30000
quarkus.rest-client."io.quarkus.restclient.config.RestClientConfigTest".connection-pool-size=10
quarkus.rest-client."io.quarkus.restclient.config.RestClientConfigTest".multipart.max-chunk-size=1024
Expand Up @@ -18,6 +18,9 @@
import jakarta.ws.rs.HeaderParam;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.rest.client.RestClientBuilder;
Expand Down Expand Up @@ -63,6 +66,21 @@ void shouldUploadBiggishFile() {
assertThat(result).isEqualTo("myFile");
}

@Test
void shouldChunkRequest() {
Client client = RestClientBuilder.newBuilder()
.baseUri(baseUri)
.property("io.quarkus.rest.client.max-chunk-size", 2)
.build(Client.class);

ClientForm form = new ClientForm();
form.file = Multi.createBy().repeating().supplier(
() -> (byte) 'A').atMost(10);

String result = client.chunked(form);
assertThat(result).isEqualTo("myFile/-1/chunked");
}

@Test
void shouldUploadTwoSmallFiles() {
Client client = RestClientBuilder.newBuilder().baseUri(baseUri).build(Client.class);
Expand Down Expand Up @@ -180,6 +198,15 @@ public String uploadNull(@MultipartForm FormDataWithFile form) {
return form.myFile != null ? "NON_NULL_FILE_FROM_NULL_MULTI" : "NULL_FILE";
}

@Path("/chunked")
@POST
@Produces(MediaType.TEXT_PLAIN)
@Consumes(MediaType.MULTIPART_FORM_DATA)
public String chunked(@Context HttpHeaders headers, @MultipartForm FormData form) {
String filename = verifyFile(form.myFile, 10, b -> (byte) 'A');
return filename + "/" + headers.getLength() + "/" + headers.getHeaderString("transfer-encoding");
}

@Path("/")
@POST
@Consumes(MediaType.MULTIPART_FORM_DATA)
Expand Down Expand Up @@ -269,6 +296,11 @@ public interface Client {
@Consumes(MediaType.MULTIPART_FORM_DATA)
String postMultipart(@MultipartForm ClientForm clientForm);

@Path("/chunked")
@POST
@Consumes(MediaType.MULTIPART_FORM_DATA)
String chunked(@MultipartForm ClientForm clientForm);

@Path("/two-files")
@POST
@Consumes(MediaType.MULTIPART_FORM_DATA)
Expand Down
@@ -0,0 +1,5 @@
package io.quarkus.rest.client.reactive.runtime;

public class Constants {
public final static int DEFAULT_MAX_CHUNK_SIZE = 8096;
}
@@ -1,5 +1,7 @@
package io.quarkus.rest.client.reactive.runtime;

import static io.quarkus.rest.client.reactive.runtime.Constants.DEFAULT_MAX_CHUNK_SIZE;

import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -360,6 +362,13 @@ public <T> T build(Class<T> aClass) throws IllegalStateException, RestClientDefi
clientBuilder.setUserAgent(restClientsConfig.userAgent.get());
}

Integer maxChunkSize = (Integer) getConfiguration().getProperty(QuarkusRestClientProperties.MAX_CHUNK_SIZE);
if (maxChunkSize != null) {
clientBuilder.maxChunkSize(maxChunkSize);
} else {
clientBuilder.maxChunkSize(DEFAULT_MAX_CHUNK_SIZE);
}

if (getConfiguration().hasProperty(QuarkusRestClientProperties.HTTP2)) {
clientBuilder.http2((Boolean) getConfiguration().getProperty(QuarkusRestClientProperties.HTTP2));
} else if (restClientsConfig.http2) {
Expand Down
@@ -1,5 +1,7 @@
package io.quarkus.rest.client.reactive.runtime;

import static io.quarkus.rest.client.reactive.runtime.Constants.DEFAULT_MAX_CHUNK_SIZE;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -120,6 +122,10 @@ private void configureCustomProperties(QuarkusRestClientBuilder builder) {
builder.property(QuarkusRestClientProperties.USER_AGENT, userAgent.get());
}

Optional<Integer> maxChunkSize = oneOf(clientConfigByClassName().multipart.maxChunkSize,
clientConfigByConfigKey().multipart.maxChunkSize, configRoot.multipart.maxChunkSize);
builder.property(QuarkusRestClientProperties.MAX_CHUNK_SIZE, maxChunkSize.orElse(DEFAULT_MAX_CHUNK_SIZE));

Boolean http2 = oneOf(clientConfigByClassName().http2,
clientConfigByConfigKey().http2).orElse(configRoot.http2);
builder.property(QuarkusRestClientProperties.HTTP2, http2);
Expand Down
Expand Up @@ -29,6 +29,7 @@

import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
import io.quarkus.restclient.config.RestClientConfig;
import io.quarkus.restclient.config.RestClientMultipartConfig;
import io.quarkus.restclient.config.RestClientsConfig;

@SuppressWarnings({ "SameParameterValue" })
Expand Down Expand Up @@ -103,6 +104,7 @@ public void testClientSpecificConfigs() {
Mockito.verify(restClientBuilderMock).property(QuarkusRestClientProperties.CONNECTION_POOL_SIZE, 103);
Mockito.verify(restClientBuilderMock).property(QuarkusRestClientProperties.KEEP_ALIVE_ENABLED, false);
Mockito.verify(restClientBuilderMock).property(QuarkusRestClientProperties.MAX_REDIRECTS, 104);
Mockito.verify(restClientBuilderMock).property(QuarkusRestClientProperties.MAX_CHUNK_SIZE, 1024);
Mockito.verify(restClientBuilderMock).followRedirects(true);
Mockito.verify(restClientBuilderMock).register(MyResponseFilter1.class);
Mockito.verify(restClientBuilderMock).queryParamStyle(QueryParamStyle.COMMA_SEPARATED);
Expand Down Expand Up @@ -145,6 +147,7 @@ public void testGlobalConfigs() {
Mockito.verify(restClientBuilderMock).property(QuarkusRestClientProperties.CONNECTION_POOL_SIZE, 203);
Mockito.verify(restClientBuilderMock).property(QuarkusRestClientProperties.KEEP_ALIVE_ENABLED, true);
Mockito.verify(restClientBuilderMock).property(QuarkusRestClientProperties.MAX_REDIRECTS, 204);
Mockito.verify(restClientBuilderMock).property(QuarkusRestClientProperties.MAX_CHUNK_SIZE, 1024);
Mockito.verify(restClientBuilderMock).followRedirects(true);
Mockito.verify(restClientBuilderMock).register(MyResponseFilter2.class);
Mockito.verify(restClientBuilderMock).queryParamStyle(QueryParamStyle.MULTI_PAIRS);
Expand Down Expand Up @@ -173,6 +176,8 @@ private static RestClientsConfig createSampleConfigRoot() {
configRoot.connectionPoolSize = Optional.of(203);
configRoot.keepAliveEnabled = Optional.of(true);
configRoot.maxRedirects = Optional.of(204);
configRoot.multipart = new RestClientMultipartConfig();
configRoot.multipart.maxChunkSize = Optional.of(1024);
configRoot.followRedirects = Optional.of(true);
configRoot.providers = Optional
.of("io.quarkus.rest.client.reactive.runtime.RestClientCDIDelegateBuilderTest$MyResponseFilter2");
Expand Down Expand Up @@ -211,6 +216,8 @@ private static RestClientConfig createSampleClientConfig() {
clientConfig.keepAliveEnabled = Optional.of(false);
clientConfig.maxRedirects = Optional.of(104);
clientConfig.followRedirects = Optional.of(true);
clientConfig.multipart = new RestClientMultipartConfig();
clientConfig.multipart.maxChunkSize = Optional.of(1024);
clientConfig.providers = Optional
.of("io.quarkus.rest.client.reactive.runtime.RestClientCDIDelegateBuilderTest$MyResponseFilter1");
clientConfig.queryParamStyle = Optional.of(QueryParamStyle.COMMA_SEPARATED);
Expand Down
Expand Up @@ -2,6 +2,11 @@

public class QuarkusRestClientProperties {

/**
* Configures the maximum chunk size.
*/
public static final String MAX_CHUNK_SIZE = "io.quarkus.rest.client.max-chunk-size";

/**
* Configure the connect timeout in ms.
*/
Expand Down
Expand Up @@ -70,9 +70,11 @@ public class ClientSendRequestHandler implements ClientRestHandler {
private final LoggingScope loggingScope;
private final ClientLogger clientLogger;
private final Map<Class<?>, MultipartResponseData> multipartResponseDataMap;
private final int maxChunkSize;

public ClientSendRequestHandler(boolean followRedirects, LoggingScope loggingScope, ClientLogger logger,
public ClientSendRequestHandler(int maxChunkSize, boolean followRedirects, LoggingScope loggingScope, ClientLogger logger,
Map<Class<?>, MultipartResponseData> multipartResponseDataMap) {
this.maxChunkSize = maxChunkSize;
this.followRedirects = followRedirects;
this.loggingScope = loggingScope;
this.clientLogger = logger;
Expand Down Expand Up @@ -457,7 +459,7 @@ private QuarkusMultipartFormUpload setMultipartHeadersAndPrepareBody(HttpClientR
mode = (PausableHttpPostRequestEncoder.EncoderMode) property;
}
QuarkusMultipartFormUpload multipartFormUpload = new QuarkusMultipartFormUpload(Vertx.currentContext(), multipartForm,
true, mode);
true, maxChunkSize, mode);
httpClientRequest.setChunked(multipartFormUpload.isChunked());
setEntityRelatedHeaders(headerMap, state.getEntity());

Expand Down
Expand Up @@ -71,6 +71,7 @@ public class ClientBuilderImpl extends ClientBuilder {

private LoggingScope loggingScope;
private Integer loggingBodySize = 100;
private int maxChunkSize = 8096;
private MultiQueryParamMode multiQueryParamMode;

private ClientLogger clientLogger = new DefaultClientLogger();
Expand Down Expand Up @@ -196,6 +197,11 @@ public ClientBuilder enableCompression() {
return this;
}

public ClientBuilder maxChunkSize(int maxChunkSize) {
this.maxChunkSize = maxChunkSize;
return this;
}

@Override
public ClientImpl build() {
HttpClientOptions options = Optional.ofNullable(configuration.getFromContext(HttpClientOptions.class))
Expand Down Expand Up @@ -287,6 +293,7 @@ public ClientImpl build() {

clientLogger.setBodySize(loggingBodySize);

options.setMaxChunkSize(maxChunkSize);
return new ClientImpl(options,
configuration,
CLIENT_CONTEXT_RESOLVER.resolve(Thread.currentThread().getContextClassLoader()),
Expand Down
Expand Up @@ -201,7 +201,8 @@ public Vertx get() {
});
}

handlerChain = new HandlerChain(followRedirects, loggingScope, clientContext.getMultipartResponsesData(), clientLogger);
handlerChain = new HandlerChain(options.getMaxChunkSize(), followRedirects, loggingScope,
clientContext.getMultipartResponsesData(), clientLogger);
}

public ClientContext getClientContext() {
Expand Down
Expand Up @@ -36,11 +36,12 @@ class HandlerChain {

private ClientRestHandler preClientSendHandler = null;

public HandlerChain(boolean followRedirects, LoggingScope loggingScope,
public HandlerChain(int maxChunkSize, boolean followRedirects, LoggingScope loggingScope,
Map<Class<?>, MultipartResponseData> multipartData, ClientLogger clientLogger) {
this.clientCaptureCurrentContextRestHandler = new ClientCaptureCurrentContextRestHandler();
this.clientSwitchToRequestContextRestHandler = new ClientSwitchToRequestContextRestHandler();
this.clientSendHandler = new ClientSendRequestHandler(followRedirects, loggingScope, clientLogger, multipartData);
this.clientSendHandler = new ClientSendRequestHandler(maxChunkSize, followRedirects, loggingScope, clientLogger,
multipartData);
this.clientSetResponseEntityRestHandler = new ClientSetResponseEntityRestHandler();
this.clientResponseCompleteRestHandler = new ClientResponseCompleteRestHandler();
this.clientErrorHandler = new ClientErrorHandler(loggingScope);
Expand Down

0 comments on commit 3e39566

Please sign in to comment.