Skip to content

Commit

Permalink
Merge #2817 into 2.0.0-M4
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed May 31, 2023
2 parents f7a90b5 + e5b4906 commit b29ef5c
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ public interface Builder {
*/
Builder maxHeaderListSize(long maxHeaderListSize);

/**
* The connection is marked for closing once the number of all-time streams reaches {@code maxStreams}.
*
* @return {@code this}
* @since 1.0.33
*/
Builder maxStreams(long maxStreams);

/**
* Sets the {@code SETTINGS_ENABLE_PUSH} value.
*
Expand Down Expand Up @@ -147,6 +155,17 @@ public Long maxHeaderListSize() {
return maxHeaderListSize;
}

/**
* Returns the configured {@code maxStreams} value or null.
*
* @return the configured {@code maxStreams} value or null
* @since 1.0.33
*/
@Nullable
public Long maxStreams() {
return maxStreams;
}

/**
* Returns the configured {@code SETTINGS_ENABLE_PUSH} value or null.
*
Expand All @@ -170,6 +189,7 @@ public boolean equals(Object o) {
Objects.equals(maxConcurrentStreams, that.maxConcurrentStreams) &&
Objects.equals(maxFrameSize, that.maxFrameSize) &&
maxHeaderListSize.equals(that.maxHeaderListSize) &&
Objects.equals(maxStreams, that.maxStreams) &&
Objects.equals(pushEnabled, that.pushEnabled);
}

Expand All @@ -181,6 +201,7 @@ public int hashCode() {
result = 31 * result + Long.hashCode(maxConcurrentStreams);
result = 31 * result + maxFrameSize;
result = 31 * result + Long.hashCode(maxHeaderListSize);
result = 31 * result + Long.hashCode(maxStreams);
result = 31 * result + Boolean.hashCode(pushEnabled);
return result;
}
Expand All @@ -190,19 +211,28 @@ public int hashCode() {
final Long maxConcurrentStreams;
final Integer maxFrameSize;
final Long maxHeaderListSize;
final Long maxStreams;
final Boolean pushEnabled;

Http2SettingsSpec(Build build) {
Http2Settings settings = build.http2Settings;
headerTableSize = settings.headerTableSize();
initialWindowSize = settings.initialWindowSize();
maxConcurrentStreams = settings.maxConcurrentStreams();
if (settings.maxConcurrentStreams() != null) {
maxConcurrentStreams = build.maxStreams != null ?
Math.min(settings.maxConcurrentStreams(), build.maxStreams) : settings.maxConcurrentStreams();
}
else {
maxConcurrentStreams = build.maxStreams;
}
maxFrameSize = settings.maxFrameSize();
maxHeaderListSize = settings.maxHeaderListSize();
maxStreams = build.maxStreams;
pushEnabled = settings.pushEnabled();
}

static final class Build implements Builder {
Long maxStreams;
final Http2Settings http2Settings = Http2Settings.defaultSettings();

@Override
Expand Down Expand Up @@ -240,6 +270,15 @@ public Builder maxHeaderListSize(long maxHeaderListSize) {
return this;
}

@Override
public Builder maxStreams(long maxStreams) {
if (maxStreams < 1) {
throw new IllegalArgumentException("maxStreams must be positive");
}
this.maxStreams = Long.valueOf(maxStreams);
return this;
}

/*
@Override
public Builder pushEnabled(boolean pushEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import io.netty5.handler.codec.http.HttpServerUpgradeHandler;
import io.netty5.handler.codec.http2.CleartextHttp2ServerUpgradeHandler;
import io.netty5.handler.codec.http2.Http2CodecUtil;
import io.netty5.handler.codec.http2.Http2ConnectionAdapter;
import io.netty5.handler.codec.http2.Http2FrameCodec;
import io.netty5.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty5.handler.codec.http2.Http2FrameLogger;
import io.netty5.handler.codec.http2.Http2MultiplexHandler;
import io.netty5.handler.codec.http2.Http2ServerUpgradeCodec;
import io.netty5.handler.codec.http2.Http2Settings;
import io.netty5.handler.codec.http2.Http2Stream;
import io.netty5.handler.codec.http2.Http2StreamFrameToHttpObjectCodec;
import io.netty5.handler.logging.LogLevel;
import io.netty5.handler.logging.LoggingHandler;
Expand Down Expand Up @@ -341,7 +343,7 @@ else if (p == HttpProtocol.H2C) {
this._protocols = _protocols;
}

Http2Settings http2Settings() {
static Http2Settings http2Settings(@Nullable Http2SettingsSpec http2Settings) {
Http2Settings settings = Http2Settings.defaultSettings();

if (http2Settings != null) {
Expand Down Expand Up @@ -489,7 +491,7 @@ static void configureH2Pipeline(ChannelPipeline p,
boolean enableGracefulShutdown,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Http2Settings http2Settings,
@Nullable Http2SettingsSpec http2SettingsSpec,
HttpMessageLogFactory httpMessageLogFactory,
@Nullable Duration idleTimeout,
ConnectionObserver listener,
Expand All @@ -504,11 +506,16 @@ static void configureH2Pipeline(ChannelPipeline p,
Http2FrameCodecBuilder http2FrameCodecBuilder =
Http2FrameCodecBuilder.forServer()
.validateHeaders(validate)
.initialSettings(http2Settings);
.initialSettings(http2Settings(http2SettingsSpec));

if (enableGracefulShutdown) {
// Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout
Long maxStreams = http2SettingsSpec != null ? http2SettingsSpec.maxStreams() : null;
if (enableGracefulShutdown || maxStreams != null) {
// 1. Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout
// when disposeNow(timeout) is invoked
// 2. When 'maxStreams' is configured, the graceful shutdown is enabled.
// The graceful shutdown is configured with indefinite timeout because
// the response time is controlled by the user and might be different
// for the different requests.
http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1);
}

Expand All @@ -517,7 +524,11 @@ static void configureH2Pipeline(ChannelPipeline p,
"reactor.netty5.http.server.h2"));
}

p.addLast(NettyPipeline.HttpCodec, http2FrameCodecBuilder.build())
Http2FrameCodec http2FrameCodec = http2FrameCodecBuilder.build();
if (maxStreams != null) {
http2FrameCodec.connection().addListener(new H2ConnectionListener(p.channel(), maxStreams));
}
p.addLast(NettyPipeline.HttpCodec, http2FrameCodec)
.addLast(NettyPipeline.H2MultiplexHandler,
new Http2MultiplexHandler(new H2Codec(accessLogEnabled, accessLog, compressPredicate,
formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, mapHandle,
Expand All @@ -543,7 +554,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p,
boolean enableGracefulShutdown,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Http2Settings http2Settings,
@Nullable Http2SettingsSpec http2SettingsSpec,
HttpMessageLogFactory httpMessageLogFactory,
@Nullable Duration idleTimeout,
ConnectionObserver listener,
Expand All @@ -560,10 +571,10 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p,

Http11OrH2CleartextCodec upgrader = new Http11OrH2CleartextCodec(accessLogEnabled, accessLog, compressPredicate,
p.get(NettyPipeline.LoggingHandler) != null, enableGracefulShutdown, formDecoderProvider,
forwardedHeaderHandler, http2Settings, httpMessageLogFactory, listener, mapHandle, metricsRecorder,
forwardedHeaderHandler, http2SettingsSpec, httpMessageLogFactory, listener, mapHandle, metricsRecorder,
minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders());

ChannelHandler http2ServerHandler = new H2CleartextCodec(upgrader);
ChannelHandler http2ServerHandler = new H2CleartextCodec(upgrader, http2SettingsSpec != null ? http2SettingsSpec.maxStreams() : null);
CleartextHttp2ServerUpgradeHandler h2cUpgradeHandler = new CleartextHttp2ServerUpgradeHandler(
httpServerCodec,
new HttpServerUpgradeHandler<DefaultHttpContent>(httpServerCodec, upgrader, decoder.h2cMaxContentLength()),
Expand Down Expand Up @@ -742,29 +753,39 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter {
final Http11OrH2CleartextCodec upgrader;
final boolean addHttp2FrameCodec;
final boolean removeMetricsHandler;
final Long maxStreams;

/**
* Used when full H2 preface is received
*/
H2CleartextCodec(Http11OrH2CleartextCodec upgrader) {
this(upgrader, true, true);
H2CleartextCodec(Http11OrH2CleartextCodec upgrader, @Nullable Long maxStreams) {
this(upgrader, true, true, maxStreams);
}

/**
* Used when upgrading from HTTP/1.1 to H2. When an upgrade happens {@link Http2FrameCodec}
* is added by {@link Http2ServerUpgradeCodec}
*/
H2CleartextCodec(Http11OrH2CleartextCodec upgrader, boolean addHttp2FrameCodec, boolean removeMetricsHandler) {
H2CleartextCodec(Http11OrH2CleartextCodec upgrader, boolean addHttp2FrameCodec, boolean removeMetricsHandler,
@Nullable Long maxStreams) {
this.upgrader = upgrader;
this.addHttp2FrameCodec = addHttp2FrameCodec;
this.removeMetricsHandler = removeMetricsHandler;
this.maxStreams = maxStreams;
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) {
ChannelPipeline pipeline = ctx.pipeline();
if (addHttp2FrameCodec) {
pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, upgrader.http2FrameCodecBuilder.build());
Http2FrameCodec http2FrameCodec = upgrader.http2FrameCodecBuilder.build();
if (maxStreams != null) {
http2FrameCodec.connection().addListener(new H2ConnectionListener(ctx.channel(), maxStreams));
}
pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, http2FrameCodec);
}
else if (maxStreams != null) {
pipeline.get(Http2FrameCodec.class).connection().addListener(new H2ConnectionListener(ctx.channel(), maxStreams));
}

// Add this handler at the end of the pipeline as it does not forward all channelRead events
Expand Down Expand Up @@ -875,6 +896,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer<Channel>
final ConnectionObserver listener;
final BiFunction<? super Mono<Void>, ? super Connection, ? extends Mono<Void>>
mapHandle;
final Long maxStreams;
final ChannelMetricsRecorder metricsRecorder;
final int minCompressionSize;
final ChannelOperations.OnSetup opsFactory;
Expand All @@ -888,7 +910,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer<Channel>
boolean enableGracefulShutdown,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Http2Settings http2Settings,
@Nullable Http2SettingsSpec http2SettingsSpec,
HttpMessageLogFactory httpMessageLogFactory,
ConnectionObserver listener,
@Nullable BiFunction<? super Mono<Void>, ? super Connection, ? extends Mono<Void>> mapHandle,
Expand All @@ -905,11 +927,16 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer<Channel>
this.http2FrameCodecBuilder =
Http2FrameCodecBuilder.forServer()
.validateHeaders(validate)
.initialSettings(http2Settings);
.initialSettings(http2Settings(http2SettingsSpec));

if (enableGracefulShutdown) {
// Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout
this.maxStreams = http2SettingsSpec != null ? http2SettingsSpec.maxStreams() : null;
if (enableGracefulShutdown || maxStreams != null) {
// 1. Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout
// when disposeNow(timeout) is invoked
// 2. When 'maxStreams' is configured, the graceful shutdown is enabled.
// The graceful shutdown is configured with indefinite timeout because
// the response time is controlled by the user and might be different
// for the different requests.
http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1);
}

Expand Down Expand Up @@ -942,14 +969,37 @@ protected void initChannel(Channel ch) {
@Nullable
public HttpServerUpgradeHandler.UpgradeCodec newUpgradeCodec(CharSequence protocol) {
if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {
return new Http2ServerUpgradeCodec(http2FrameCodecBuilder.build(), new H2CleartextCodec(this, false, false));
return new Http2ServerUpgradeCodec(http2FrameCodecBuilder.build(), new H2CleartextCodec(this, false, false, maxStreams));
}
else {
return null;
}
}
}

static final class H2ConnectionListener extends Http2ConnectionAdapter {

final Channel channel;
final long maxStreams;

long numStreams;

H2ConnectionListener(Channel channel, long maxStreams) {
this.channel = channel;
this.maxStreams = maxStreams;
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void onStreamActive(Http2Stream stream) {
assert channel.executor().inEventLoop();
if (++numStreams == maxStreams) {
//"FutureReturnValueIgnored" this is deliberate
channel.close();
}
}
}

static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler {

final boolean accessLogEnabled;
Expand All @@ -959,7 +1009,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler
final boolean enableGracefulShutdown;
final HttpServerFormDecoderProvider formDecoderProvider;
final BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler;
final Http2Settings http2Settings;
final Http2SettingsSpec http2SettingsSpec;
final HttpMessageLogFactory httpMessageLogFactory;
final Duration idleTimeout;
final ConnectionObserver listener;
Expand All @@ -980,7 +1030,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler
this.enableGracefulShutdown = initializer.enableGracefulShutdown;
this.formDecoderProvider = initializer.formDecoderProvider;
this.forwardedHeaderHandler = initializer.forwardedHeaderHandler;
this.http2Settings = initializer.http2Settings;
this.http2SettingsSpec = initializer.http2SettingsSpec;
this.httpMessageLogFactory = initializer.httpMessageLogFactory;
this.idleTimeout = initializer.idleTimeout;
this.listener = listener;
Expand All @@ -1002,7 +1052,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {

if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
configureH2Pipeline(p, accessLogEnabled, accessLog, compressPredicate,
enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2Settings, httpMessageLogFactory, idleTimeout, listener, mapHandle,
enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2SettingsSpec, httpMessageLogFactory, idleTimeout, listener, mapHandle,
metricsRecorder, minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders());
return;
}
Expand Down Expand Up @@ -1032,7 +1082,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig
final boolean enableGracefulShutdown;
final HttpServerFormDecoderProvider formDecoderProvider;
final BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler;
final Http2Settings http2Settings;
final Http2SettingsSpec http2SettingsSpec;
final HttpMessageLogFactory httpMessageLogFactory;
final Duration idleTimeout;
final BiFunction<? super Mono<Void>, ? super Connection, ? extends Mono<Void>>
Expand All @@ -1055,7 +1105,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig
this.enableGracefulShutdown = config.channelGroup() != null;
this.formDecoderProvider = config.formDecoderProvider;
this.forwardedHeaderHandler = config.forwardedHeaderHandler;
this.http2Settings = config.http2Settings();
this.http2SettingsSpec = config.http2Settings;
this.httpMessageLogFactory = config.httpMessageLogFactory;
this.idleTimeout = config.idleTimeout;
this.mapHandle = config.mapHandle;
Expand Down Expand Up @@ -1117,7 +1167,7 @@ else if ((protocols & h2) == h2) {
enableGracefulShutdown,
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
http2SettingsSpec,
httpMessageLogFactory,
idleTimeout,
observer,
Expand All @@ -1140,7 +1190,7 @@ else if ((protocols & h2) == h2) {
enableGracefulShutdown,
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
http2SettingsSpec,
httpMessageLogFactory,
idleTimeout,
observer,
Expand Down Expand Up @@ -1178,7 +1228,7 @@ else if ((protocols & h2c) == h2c) {
enableGracefulShutdown,
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
http2SettingsSpec,
httpMessageLogFactory,
idleTimeout,
observer,
Expand Down
Loading

0 comments on commit b29ef5c

Please sign in to comment.