Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

brotli support in HttpClient #2848

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,18 @@ protected CoreSubscriber<PooledRef<Connection>> createDisposableAcquire(
MonoSink<Connection> sink,
Context currentContext) {
boolean acceptGzip = false;
boolean acceptBrotli = false;
ChannelMetricsRecorder metricsRecorder = config.metricsRecorder() != null ? config.metricsRecorder().get() : null;
SocketAddress proxyAddress = ((ClientTransportConfig<?>) config).proxyProvider() != null ?
((ClientTransportConfig<?>) config).proxyProvider().getSocketAddress().get() : null;
Function<String, String> uriTagValue = null;
if (config instanceof HttpClientConfig) {
acceptGzip = ((HttpClientConfig) config).acceptGzip;
acceptBrotli = ((HttpClientConfig) config).acceptBrotli;
uriTagValue = ((HttpClientConfig) config).uriTagValue;
}
return new DisposableAcquire(connectionObserver, config.channelOperationsProvider(),
acceptGzip, metricsRecorder, pendingAcquireTimeout, pool, proxyAddress, remoteAddress, sink, currentContext, uriTagValue);
acceptGzip, acceptBrotli, metricsRecorder, pendingAcquireTimeout, pool, proxyAddress, remoteAddress, sink, currentContext, uriTagValue);
}

@Override
Expand Down Expand Up @@ -238,6 +240,7 @@ static final class DisposableAcquire
final ConnectionObserver obs;
final ChannelOperations.OnSetup opsFactory;
final boolean acceptGzip;
final boolean acceptBrotli;
final ChannelMetricsRecorder metricsRecorder;
final long pendingAcquireTimeout;
final InstrumentedPool<Connection> pool;
Expand All @@ -254,6 +257,7 @@ static final class DisposableAcquire
ConnectionObserver obs,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
boolean acceptBrotli,
@Nullable ChannelMetricsRecorder metricsRecorder,
long pendingAcquireTimeout,
InstrumentedPool<Connection> pool,
Expand All @@ -267,6 +271,7 @@ static final class DisposableAcquire
this.obs = obs;
this.opsFactory = opsFactory;
this.acceptGzip = acceptGzip;
this.acceptBrotli = acceptBrotli;
this.metricsRecorder = metricsRecorder;
this.pendingAcquireTimeout = pendingAcquireTimeout;
this.pool = pool;
Expand All @@ -283,6 +288,7 @@ static final class DisposableAcquire
this.obs = parent.obs;
this.opsFactory = parent.opsFactory;
this.acceptGzip = parent.acceptGzip;
this.acceptBrotli = parent.acceptBrotli;
this.metricsRecorder = parent.metricsRecorder;
this.pendingAcquireTimeout = parent.pendingAcquireTimeout;
this.pool = parent.pool;
Expand Down Expand Up @@ -412,7 +418,7 @@ public void operationComplete(Future<Http2StreamChannel> future) {
setChannelContext(ch, currentContext());
}
HttpClientConfig.addStreamHandlers(ch, obs.then(new HttpClientConfig.StreamConnectionObserver(currentContext())),
opsFactory, acceptGzip, metricsRecorder, proxyAddress, remoteAddress, -1, uriTagValue);
opsFactory, acceptGzip, acceptBrotli, metricsRecorder, proxyAddress, remoteAddress, -1, uriTagValue);

ChannelOperations<?, ?> ops = ChannelOperations.get(ch);
if (ops != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.compression.Brotli;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
Expand Down Expand Up @@ -521,7 +522,13 @@ public final HttpClient baseUrl(String baseUrl) {
* @return a new {@link HttpClient}
*/
public final HttpClient compress(boolean compressionEnabled) {
configuration().headers.remove(HttpHeaderNames.ACCEPT_ENCODING);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will remove any custom Accept-Encoding header that was set prior this invocation.
Why do we need to do this? It will be better to preserve the custom headers.

if (compressionEnabled) {
configuration().acceptBrotli = Brotli.isAvailable();
if (configuration().acceptBrotli) {
configuration().headers.add(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.BR);
}

if (!configuration().acceptGzip) {
HttpClient dup = duplicate();
HttpHeaders headers = configuration().headers.copy();
Expand All @@ -531,14 +538,15 @@ public final HttpClient compress(boolean compressionEnabled) {
return dup;
}
}
else if (configuration().acceptGzip) {
else if (configuration().acceptGzip || configuration().acceptBrotli) {
HttpClient dup = duplicate();
if (isCompressing(configuration().headers)) {
HttpHeaders headers = configuration().headers.copy();
headers.remove(HttpHeaderNames.ACCEPT_ENCODING);
dup.configuration().headers = headers;
}
dup.configuration().acceptGzip = false;
dup.configuration().acceptBrotli = false;
return dup;
}
return this;
Expand Down Expand Up @@ -1647,7 +1655,8 @@ public final HttpClient wiretap(boolean enable) {
}

static boolean isCompressing(HttpHeaders h) {
return h.contains(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP, true);
return h.contains(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP, true)
|| h.contains(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.BR, true);
}

static String reactorNettyVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public String baseUrl() {
public int channelHash() {
int result = super.channelHash();
result = 31 * result + Boolean.hashCode(acceptGzip);
result = 31 * result + Boolean.hashCode(acceptBrotli);
result = 31 * result + Objects.hashCode(decoder);
result = 31 * result + _protocols;
result = 31 * result + Objects.hashCode(sslProvider);
Expand Down Expand Up @@ -220,6 +221,15 @@ public boolean isAcceptGzip() {
return acceptGzip;
}

/**
* Return whether Brotli compression is enabled.
*
* @return whether Brotli compression is enabled
*/
public boolean isAcceptBrotli() {
return acceptBrotli;
}

/**
* Return true if {@code retry once} is disabled, false otherwise.
*
Expand Down Expand Up @@ -332,6 +342,7 @@ public WebsocketClientSpec websocketClientSpec() {
// Protected/Package private write API

boolean acceptGzip;
boolean acceptBrotli;
String baseUrl;
BiFunction<? super HttpClientRequest, ? super NettyOutbound, ? extends Publisher<Void>> body;
Function<? super Mono<? extends Connection>, ? extends Mono<? extends Connection>> connector;
Expand Down Expand Up @@ -368,6 +379,7 @@ public WebsocketClientSpec websocketClientSpec() {
Supplier<? extends SocketAddress> remoteAddress) {
super(connectionProvider, options, remoteAddress);
this.acceptGzip = false;
this.acceptBrotli = false;
this.cookieDecoder = ClientCookieDecoder.STRICT;
this.cookieEncoder = ClientCookieEncoder.STRICT;
this.decoder = new HttpResponseDecoderSpec();
Expand All @@ -382,6 +394,7 @@ public WebsocketClientSpec websocketClientSpec() {
HttpClientConfig(HttpClientConfig parent) {
super(parent);
this.acceptGzip = parent.acceptGzip;
this.acceptBrotli = parent.acceptBrotli;
this.baseUrl = parent.baseUrl;
this.body = parent.body;
this.connector = parent.connector;
Expand Down Expand Up @@ -578,6 +591,7 @@ static void addStreamHandlers(
ConnectionObserver obs,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
boolean acceptBrotli,
@Nullable ChannelMetricsRecorder metricsRecorder,
@Nullable SocketAddress proxyAddress,
SocketAddress remoteAddress,
Expand All @@ -592,7 +606,7 @@ static void addStreamHandlers(
pipeline.addLast(NettyPipeline.H2ToHttp11Codec, HTTP2_STREAM_FRAME_TO_HTTP_OBJECT)
.addLast(NettyPipeline.HttpTrafficHandler, HTTP_2_STREAM_BRIDGE_CLIENT_HANDLER);

if (acceptGzip) {
if (acceptGzip || acceptBrotli) {
pipeline.addLast(NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
}

Expand Down Expand Up @@ -695,6 +709,7 @@ static void configureHttp3Pipeline(ChannelPipeline p, boolean removeMetricsRecor
static void configureHttp11OrH2CleartextPipeline(
ChannelPipeline p,
boolean acceptGzip,
boolean acceptBrotli,
HttpResponseDecoderSpec decoder,
Http2Settings http2Settings,
@Nullable ChannelMetricsRecorder metricsRecorder,
Expand Down Expand Up @@ -726,7 +741,7 @@ static void configureHttp11OrH2CleartextPipeline(
Http2FrameCodec http2FrameCodec = http2FrameCodecBuilder.build();

Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2FrameCodec,
new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, metricsRecorder, proxyAddress, remoteAddress, uriTagValue));
new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, proxyAddress, remoteAddress, uriTagValue));

HttpClientUpgradeHandler upgradeHandler =
new ReactorNettyHttpClientUpgradeHandler(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength());
Expand All @@ -735,7 +750,7 @@ static void configureHttp11OrH2CleartextPipeline(
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2CUpgradeHandler, upgradeHandler)
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpTrafficHandler, new HttpTrafficHandler(observer));

if (acceptGzip) {
if (acceptGzip || acceptBrotli) {
p.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
}

Expand All @@ -760,6 +775,7 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) {
@SuppressWarnings("deprecation")
static void configureHttp11Pipeline(ChannelPipeline p,
boolean acceptGzip,
boolean acceptBrotli,
HttpResponseDecoderSpec decoder,
@Nullable ChannelMetricsRecorder metricsRecorder,
@Nullable SocketAddress proxyAddress,
Expand All @@ -776,7 +792,7 @@ static void configureHttp11Pipeline(ChannelPipeline p,
NettyPipeline.HttpCodec,
new HttpClientCodec(decoderConfig, decoder.failOnMissingResponse, decoder.parseHttpAfterConnectRequest));

if (acceptGzip) {
if (acceptGzip || acceptBrotli) {
p.addAfter(NettyPipeline.HttpCodec, NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
}

Expand Down Expand Up @@ -837,6 +853,7 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) {
static final class H2CleartextCodec extends ChannelHandlerAdapter {

final boolean acceptGzip;
final boolean acceptBrotli;
final Http2FrameCodec http2FrameCodec;
final ChannelMetricsRecorder metricsRecorder;
final ChannelOperations.OnSetup opsFactory;
Expand All @@ -848,11 +865,13 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter {
Http2FrameCodec http2FrameCodec,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
boolean acceptBrotli,
@Nullable ChannelMetricsRecorder metricsRecorder,
@Nullable SocketAddress proxyAddress,
SocketAddress remoteAddress,
@Nullable Function<String, String> uriTagValue) {
this.acceptGzip = acceptGzip;
this.acceptBrotli = acceptBrotli;
this.http2FrameCodec = http2FrameCodec;
this.metricsRecorder = metricsRecorder;
this.opsFactory = opsFactory;
Expand All @@ -877,12 +896,12 @@ public void handlerAdded(ChannelHandlerContext ctx) {
if (responseTimeoutHandler != null) {
pipeline.remove(NettyPipeline.ResponseTimeoutHandler);
http2MultiplexHandler = new Http2MultiplexHandler(H2InboundStreamHandler.INSTANCE,
new H2Codec(owner, obs, opsFactory, acceptGzip, metricsRecorder, proxyAddress,
new H2Codec(owner, obs, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, proxyAddress,
remoteAddress, responseTimeoutHandler.getReaderIdleTimeInMillis(), uriTagValue));
}
else {
http2MultiplexHandler = new Http2MultiplexHandler(H2InboundStreamHandler.INSTANCE,
new H2Codec(owner, obs, opsFactory, acceptGzip, metricsRecorder, proxyAddress, remoteAddress, uriTagValue));
new H2Codec(owner, obs, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, proxyAddress, remoteAddress, uriTagValue));
}
pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, http2FrameCodec)
.addAfter(NettyPipeline.HttpCodec, NettyPipeline.H2MultiplexHandler, http2MultiplexHandler);
Expand All @@ -897,6 +916,7 @@ public void handlerAdded(ChannelHandlerContext ctx) {
static final class H2Codec extends ChannelInitializer<Channel> {

final boolean acceptGzip;
final boolean acceptBrotli;
final ChannelMetricsRecorder metricsRecorder;
final ConnectionObserver observer;
final ChannelOperations.OnSetup opsFactory;
Expand All @@ -911,26 +931,29 @@ static final class H2Codec extends ChannelInitializer<Channel> {
@Nullable ConnectionObserver observer,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
boolean acceptBrotli,
@Nullable ChannelMetricsRecorder metricsRecorder,
@Nullable SocketAddress proxyAddress,
SocketAddress remoteAddress,
@Nullable Function<String, String> uriTagValue) {
// Handle outbound and upgrade streams
this(owner, observer, opsFactory, acceptGzip, metricsRecorder, proxyAddress, remoteAddress, -1, uriTagValue);
this(owner, observer, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, proxyAddress, remoteAddress, -1, uriTagValue);
}

H2Codec(
@Nullable Http2ConnectionProvider.DisposableAcquire owner,
@Nullable ConnectionObserver observer,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
boolean acceptBrotli,
@Nullable ChannelMetricsRecorder metricsRecorder,
@Nullable SocketAddress proxyAddress,
SocketAddress remoteAddress,
long responseTimeoutMillis,
@Nullable Function<String, String> uriTagValue) {
// Handle outbound and upgrade streams
this.acceptGzip = acceptGzip;
this.acceptBrotli = acceptBrotli;
this.metricsRecorder = metricsRecorder;
this.observer = observer;
this.opsFactory = opsFactory;
Expand All @@ -949,7 +972,7 @@ protected void initChannel(Channel ch) {
setChannelContext(ch, owner.currentContext());
}
addStreamHandlers(ch, observer.then(new StreamConnectionObserver(owner.currentContext())), opsFactory,
acceptGzip, metricsRecorder, proxyAddress, remoteAddress, responseTimeoutMillis, uriTagValue);
acceptGzip, acceptBrotli, metricsRecorder, proxyAddress, remoteAddress, responseTimeoutMillis, uriTagValue);
}
else {
// Handle server pushes (inbound streams)
Expand All @@ -973,6 +996,7 @@ public boolean isSharable() {

static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter {
final boolean acceptGzip;
final boolean acceptBrotli;
final HttpResponseDecoderSpec decoder;
final Http2Settings http2Settings;
final ChannelMetricsRecorder metricsRecorder;
Expand All @@ -983,6 +1007,7 @@ static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter {

H2OrHttp11Codec(HttpClientChannelInitializer initializer, ConnectionObserver observer, SocketAddress remoteAddress) {
this.acceptGzip = initializer.acceptGzip;
this.acceptBrotli = initializer.acceptBrotli;
this.decoder = initializer.decoder;
this.http2Settings = initializer.http2Settings;
this.metricsRecorder = initializer.metricsRecorder;
Expand All @@ -1006,7 +1031,7 @@ public void channelActive(ChannelHandlerContext ctx) {
configureHttp2Pipeline(ctx.channel().pipeline(), decoder, http2Settings, observer);
}
else if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
configureHttp11Pipeline(ctx.channel().pipeline(), acceptGzip, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue);
configureHttp11Pipeline(ctx.channel().pipeline(), acceptGzip, acceptBrotli, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue);
}
else {
throw new IllegalStateException("unknown protocol: " + protocol);
Expand All @@ -1025,6 +1050,7 @@ else if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
static final class HttpClientChannelInitializer implements ChannelPipelineConfigurer {

final boolean acceptGzip;
final boolean acceptBrotli;
final HttpResponseDecoderSpec decoder;
final Http2Settings http2Settings;
final ChannelMetricsRecorder metricsRecorder;
Expand All @@ -1036,6 +1062,7 @@ static final class HttpClientChannelInitializer implements ChannelPipelineConfig

HttpClientChannelInitializer(HttpClientConfig config) {
this.acceptGzip = config.acceptGzip;
this.acceptBrotli = config.acceptBrotli;
this.decoder = config.decoder;
this.http2Settings = config.http2Settings();
this.metricsRecorder = config.metricsRecorderInternal();
Expand All @@ -1059,7 +1086,7 @@ public void onChannelInit(ConnectionObserver observer, Channel channel, @Nullabl
new H2OrHttp11Codec(this, observer, remoteAddress));
}
else if ((protocols & h11) == h11) {
configureHttp11Pipeline(channel.pipeline(), acceptGzip, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue);
configureHttp11Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue);
}
else if ((protocols & h2) == h2) {
configureHttp2Pipeline(channel.pipeline(), decoder, http2Settings, observer);
Expand All @@ -1070,10 +1097,10 @@ else if ((protocols & h3) == h3) {
}
else {
if ((protocols & h11orH2C) == h11orH2C) {
configureHttp11OrH2CleartextPipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, metricsRecorder, observer, opsFactory, proxyAddress, remoteAddress, uriTagValue);
configureHttp11OrH2CleartextPipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, http2Settings, metricsRecorder, observer, opsFactory, proxyAddress, remoteAddress, uriTagValue);
}
else if ((protocols & h11) == h11) {
configureHttp11Pipeline(channel.pipeline(), acceptGzip, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue);
configureHttp11Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue);
}
else if ((protocols & h2c) == h2c) {
configureHttp2Pipeline(channel.pipeline(), decoder, http2Settings, observer);
Expand Down
Loading
Loading