diff --git a/codequality/checkstyle.xml b/codequality/checkstyle.xml index 87e01f8411..2ed2bace7f 100644 --- a/codequality/checkstyle.xml +++ b/codequality/checkstyle.xml @@ -76,7 +76,7 @@ - + diff --git a/reactor-netty-core/src/main/java/reactor/netty/NettyPipeline.java b/reactor-netty-core/src/main/java/reactor/netty/NettyPipeline.java index 951e6069af..244f96831b 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/NettyPipeline.java +++ b/reactor-netty-core/src/main/java/reactor/netty/NettyPipeline.java @@ -93,6 +93,7 @@ public interface NettyPipeline { String CompressionHandler = LEFT + "compressionHandler"; String ConnectMetricsHandler = LEFT + "connectMetricsHandler"; String H2CUpgradeHandler = LEFT + "h2cUpgradeHandler"; + String H2Flush = LEFT + "h2Flush"; String H2MultiplexHandler = LEFT + "h2MultiplexHandler"; String H2OrHttp11Codec = LEFT + "h2OrHttp11Codec"; String H2ToHttp11Codec = LEFT + "h2ToHttp11Codec"; diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java index bd3999ee3a..db831832c9 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java @@ -253,6 +253,68 @@ default String name() { return null; } + interface AllocationStrategy> { + + /** + * Returns a deep copy of this instance. + * + * @return a deep copy of this instance + */ + A copy(); + + /** + * Best-effort peek at the state of the strategy which indicates roughly how many more connections can currently be + * allocated. Should be paired with {@link #getPermits(int)} for an atomic permission. + * + * @return an ESTIMATED count of how many more connections can currently be allocated + */ + int estimatePermitCount(); + + /** + * Try to get the permission to allocate a {@code desired} positive number of new connections. Returns the permissible + * number of connections which MUST be created (otherwise the internal live counter of the strategy might be off). + * This permissible number might be zero, and it can also be a greater number than {@code desired}. + * Once a connection is discarded from the pool, it must update the strategy using {@link #returnPermits(int)} + * (which can happen in batches or with value {@literal 1}). + * + * @param desired the desired number of new connections + * @return the actual number of new connections that MUST be created, can be 0 and can be more than {@code desired} + */ + int getPermits(int desired); + + /** + * Returns the best estimate of the number of permits currently granted, between 0 and {@link Integer#MAX_VALUE} + * + * @return the best estimate of the number of permits currently granted, between 0 and {@link Integer#MAX_VALUE} + */ + int permitGranted(); + + /** + * Return the minimum number of permits this strategy tries to maintain granted + * (reflecting a minimal size for the pool), or {@code 0} for scale-to-zero. + * + * @return the minimum number of permits this strategy tries to maintain, or {@code 0} + */ + int permitMinimum(); + + /** + * Returns the maximum number of permits this strategy can grant in total, or {@link Integer#MAX_VALUE} for unbounded + * + * @return the maximum number of permits this strategy can grant in total, or {@link Integer#MAX_VALUE} for unbounded + */ + int permitMaximum(); + + /** + * Update the strategy to indicate that N connections were discarded, potentially leaving space + * for N new ones to be allocated. Users MUST ensure that this method isn't called with a value greater than the + * number of held permits it has. + *

+ * Some strategy MIGHT throw an {@link IllegalArgumentException} if it can be determined the number of returned permits + * is not consistent with the strategy's limits and delivered permits. + */ + void returnPermits(int returned); + } + /** * Build a {@link ConnectionProvider} to cache and reuse a fixed maximum number of * {@link Connection}. Further connections will be pending acquisition depending on @@ -387,6 +449,7 @@ class ConnectionPoolSpec> implements Suppl boolean metricsEnabled; String leasingStrategy = DEFAULT_POOL_LEASING_STRATEGY; Supplier registrar; + AllocationStrategy allocationStrategy; /** * Returns {@link ConnectionPoolSpec} new instance with default properties. @@ -410,6 +473,7 @@ private ConnectionPoolSpec() { this.metricsEnabled = copy.metricsEnabled; this.leasingStrategy = copy.leasingStrategy; this.registrar = copy.registrar; + this.allocationStrategy = copy.allocationStrategy; } /** @@ -428,10 +492,13 @@ public final SPEC pendingAcquireTimeout(Duration pendingAcquireTimeout) { /** * Set the options to use for configuring {@link ConnectionProvider} maximum connections per connection pool. + * This is a pre-made allocation strategy where only max connections is specified. + * Custom allocation strategies can be provided via {@link #allocationStrategy(AllocationStrategy)}. * Default to {@link #DEFAULT_POOL_MAX_CONNECTIONS}. * * @param maxConnections the maximum number of connections (per connection pool) before start pending * @return {@literal this} + * @see #allocationStrategy(AllocationStrategy) * @throws IllegalArgumentException if maxConnections is negative */ public final SPEC maxConnections(int maxConnections) { @@ -439,6 +506,7 @@ public final SPEC maxConnections(int maxConnections) { throw new IllegalArgumentException("Max Connections value must be strictly positive"); } this.maxConnections = maxConnections; + this.allocationStrategy = null; return get(); } @@ -580,6 +648,22 @@ public final SPEC evictInBackground(Duration evictionInterval) { return get(); } + /** + * Limits in how many connections can be allocated and managed by the pool are driven by the + * provided {@link AllocationStrategy}. This is a customization escape hatch that replaces the last + * configured strategy, but most cases should be covered by the {@link #maxConnections()} + * pre-made allocation strategy. + * + * @param allocationStrategy the {@link AllocationStrategy} to use + * @return {@literal this} + * @see #maxConnections() + * @since 1.0.20 + */ + public final SPEC allocationStrategy(AllocationStrategy allocationStrategy) { + this.allocationStrategy = Objects.requireNonNull(allocationStrategy, "allocationStrategy"); + return get(); + } + @Override @SuppressWarnings("unchecked") public SPEC get() { diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java index a19816e512..e065c4f268 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java @@ -30,7 +30,6 @@ import reactor.netty.ReactorNetty; import reactor.netty.transport.TransportConfig; import reactor.netty.internal.util.MapUtils; -import reactor.pool.AllocationStrategy; import reactor.pool.InstrumentedPool; import reactor.pool.Pool; import reactor.pool.PoolBuilder; @@ -89,6 +88,7 @@ public abstract class PooledConnectionProvider implements final Duration poolInactivity; final Duration disposeTimeout; final Map maxConnections = new HashMap<>(); + Mono onDispose; protected PooledConnectionProvider(Builder builder) { this(builder, null); @@ -106,6 +106,7 @@ protected PooledConnectionProvider(Builder builder) { poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout)); maxConnections.put(entry.getKey(), entry.getValue().maxConnections); } + this.onDispose = Mono.empty(); scheduleInactivePoolsDisposal(); } @@ -190,10 +191,10 @@ public final Mono disposeLater() { }) .collect(Collectors.toList()); if (pools.isEmpty()) { - return Mono.empty(); + return onDispose; } channelPools.clear(); - return Mono.when(pools); + return onDispose.and(Mono.when(pools)); }); } @@ -243,6 +244,10 @@ public String name() { return name; } + public void onDispose(Mono disposeMono) { + onDispose = onDispose.and(disposeMono); + } + protected abstract CoreSubscriber> createDisposableAcquire( TransportConfig config, ConnectionObserver connectionObserver, @@ -364,6 +369,7 @@ protected static final class PoolFactory { final Supplier registrar; final Clock clock; final Duration disposeTimeout; + final AllocationStrategy allocationStrategy; PoolFactory(ConnectionPoolSpec conf, Duration disposeTimeout) { this(conf, disposeTimeout, null); @@ -383,11 +389,12 @@ protected static final class PoolFactory { this.registrar = conf.registrar; this.clock = clock; this.disposeTimeout = disposeTimeout; + this.allocationStrategy = conf.allocationStrategy; } public InstrumentedPool newPool( Publisher allocator, - @Nullable AllocationStrategy allocationStrategy, + @Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility Function> destroyHandler, BiPredicate evictionPredicate) { if (disposeTimeout != null) { @@ -399,7 +406,7 @@ public InstrumentedPool newPool( public InstrumentedPool newPool( Publisher allocator, - @Nullable AllocationStrategy allocationStrategy, + @Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility Function> destroyHandler, BiPredicate evictionPredicate, Function, InstrumentedPool> poolFactory) { @@ -432,7 +439,12 @@ PoolBuilder> newPoolInternal( DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE)); } else { - poolBuilder = poolBuilder.sizeBetween(0, maxConnections); + if (allocationStrategy == null) { + poolBuilder = poolBuilder.sizeBetween(0, maxConnections); + } + else { + poolBuilder = poolBuilder.allocationStrategy(new DelegatingAllocationStrategy(allocationStrategy.copy())); + } } if (clock != null) { @@ -449,6 +461,15 @@ PoolBuilder> newPoolInternal( return poolBuilder; } + @Nullable + public AllocationStrategy allocationStrategy() { + return allocationStrategy; + } + + public long maxIdleTime() { + return this.maxIdleTime; + } + public long maxLifeTime() { return maxLifeTime; } @@ -466,6 +487,45 @@ public String toString() { ", pendingAcquireTimeout=" + pendingAcquireTimeout + '}'; } + + static final class DelegatingAllocationStrategy implements reactor.pool.AllocationStrategy { + + final AllocationStrategy delegate; + + DelegatingAllocationStrategy(AllocationStrategy delegate) { + this.delegate = delegate; + } + + @Override + public int estimatePermitCount() { + return delegate.estimatePermitCount(); + } + + @Override + public int getPermits(int desired) { + return delegate.getPermits(desired); + } + + @Override + public int permitGranted() { + return delegate.permitGranted(); + } + + @Override + public int permitMinimum() { + return delegate.permitMinimum(); + } + + @Override + public int permitMaximum() { + return delegate.permitMaximum(); + } + + @Override + public void returnPermits(int returned) { + delegate.returnPermits(returned); + } + } } static final class PoolKey { diff --git a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java index 6a37523989..64802ac61a 100644 --- a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java +++ b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ class ConnectionProviderTest { + static final TestAllocationStrategy TEST_ALLOCATION_STRATEGY = new TestAllocationStrategy(); static final String TEST_STRING = ""; static final Supplier TEST_SUPPLIER = () -> (a, b, c, d) -> {}; @@ -64,6 +65,9 @@ else if (Map.class == clazz) { else if (Supplier.class == clazz) { field.set(builder, TEST_SUPPLIER); } + else if (ConnectionProvider.AllocationStrategy.class == clazz) { + field.set(builder, TEST_ALLOCATION_STRATEGY); + } else if (boolean.class == clazz) { field.setBoolean(builder, true); } @@ -74,4 +78,41 @@ else if (int.class == clazz) { throw new IllegalArgumentException("Unknown field type " + clazz); } } + + static final class TestAllocationStrategy implements ConnectionProvider.AllocationStrategy { + + @Override + public TestAllocationStrategy copy() { + return this; + } + + @Override + public int estimatePermitCount() { + return 0; + } + + @Override + public int getPermits(int desired) { + return 0; + } + + @Override + public int permitGranted() { + return 0; + } + + @Override + public int permitMinimum() { + return 0; + } + + @Override + public int permitMaximum() { + return 0; + } + + @Override + public void returnPermits(int returned) { + } + } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java b/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java index 801dcac824..a94268145b 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java @@ -15,6 +15,7 @@ */ package reactor.netty.http; +import java.net.SocketAddress; import java.time.Duration; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -147,6 +148,15 @@ public static HttpResources set(LoopResources loops) { http2ConnectionProvider = new AtomicReference<>(); } + @Override + public void disposeWhen(SocketAddress remoteAddress) { + ConnectionProvider provider = http2ConnectionProvider.get(); + if (provider != null) { + provider.disposeWhen(remoteAddress); + } + super.disposeWhen(remoteAddress); + } + @Override public AddressResolverGroup getOrCreateDefaultResolver() { return super.getOrCreateDefaultResolver(); diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java new file mode 100644 index 0000000000..2dd5c49c61 --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.client; + +import reactor.netty.resources.ConnectionProvider; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +/** + * HTTP/2 {@link ConnectionProvider.AllocationStrategy}. + * + *

This class is based on + * https://github.com/reactor/reactor-pool/blob/d5cb5b72cdbcbbee8d781e06972c4da21766107f/src/main/java/reactor/pool/AllocationStrategies.java#L73 + * + * @author Violeta Georgieva + * @since 1.0.20 + */ +public final class Http2AllocationStrategy implements ConnectionProvider.AllocationStrategy { + + public interface Builder { + + /** + * Build a new {@link Http2AllocationStrategy} + * + * @return a new {@link Http2AllocationStrategy} + */ + Http2AllocationStrategy build(); + + /** + * Configures the maximum number of the concurrent streams that can be opened to the remote peer. + * When evaluating how many streams can be opened to the remote peer, + * the minimum of this configuration and the remote peer configuration is taken (unless -1 is used). + * Default to {@code -1} - use always the remote peer configuration. + * + * @param maxConcurrentStreams the maximum number of the concurrent streams that can be opened to the remote peer + * @return {@code this} + */ + Builder maxConcurrentStreams(long maxConcurrentStreams); + + /** + * Configures the maximum number of live connections to keep in the pool. + * Default to {@link Integer#MAX_VALUE} - no upper limit. + * + * @param maxConnections the maximum number of live connections to keep in the pool + * @return {@code this} + */ + Builder maxConnections(int maxConnections); + + /** + * Configures the minimum number of live connections to keep in the pool (can be the best effort). + * Default to {@code 0}. + * + * @return {@code this} + */ + Builder minConnections(int minConnections); + } + + /** + * Creates a builder for {@link Http2AllocationStrategy}. + * + * @return a new {@link Http2AllocationStrategy.Builder} + */ + public static Http2AllocationStrategy.Builder builder() { + return new Http2AllocationStrategy.Build(); + } + + @Override + public Http2AllocationStrategy copy() { + return new Http2AllocationStrategy(this); + } + + @Override + public int estimatePermitCount() { + return PERMITS.get(this); + } + + @Override + public int getPermits(int desired) { + if (desired < 0) { + return 0; + } + + for (;;) { + int p = permits; + int target = Math.min(desired, p); + + if (PERMITS.compareAndSet(this, p, p - target)) { + return target; + } + } + } + + /** + * Returns the configured maximum number of the concurrent streams that can be opened to the remote peer. + * + * @return the configured maximum number of the concurrent streams that can be opened to the remote peer + */ + public long maxConcurrentStreams() { + return maxConcurrentStreams; + } + + @Override + public int permitGranted() { + return maxConnections - PERMITS.get(this); + } + + @Override + public int permitMinimum() { + return minConnections; + } + + @Override + public int permitMaximum() { + return maxConnections; + } + + @Override + public void returnPermits(int returned) { + for (;;) { + int p = PERMITS.get(this); + if (p + returned > maxConnections) { + throw new IllegalArgumentException("Too many permits returned: returned=" + returned + + ", would bring to " + (p + returned) + "/" + maxConnections); + } + if (PERMITS.compareAndSet(this, p, p + returned)) { + return; + } + } + } + + final long maxConcurrentStreams; + final int maxConnections; + final int minConnections; + + volatile int permits; + static final AtomicIntegerFieldUpdater PERMITS = AtomicIntegerFieldUpdater.newUpdater(Http2AllocationStrategy.class, "permits"); + + Http2AllocationStrategy(Build build) { + this.maxConcurrentStreams = build.maxConcurrentStreams; + this.maxConnections = build.maxConnections; + this.minConnections = build.minConnections; + PERMITS.lazySet(this, this.maxConnections); + } + + Http2AllocationStrategy(Http2AllocationStrategy copy) { + this.maxConcurrentStreams = copy.maxConcurrentStreams; + this.maxConnections = copy.maxConnections; + this.minConnections = copy.minConnections; + PERMITS.lazySet(this, this.maxConnections); + } + + static final class Build implements Builder { + static final long DEFAULT_MAX_CONCURRENT_STREAMS = -1; + static final int DEFAULT_MAX_CONNECTIONS = Integer.MAX_VALUE; + static final int DEFAULT_MIN_CONNECTIONS = 0; + + long maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS; + int maxConnections = DEFAULT_MAX_CONNECTIONS; + int minConnections = DEFAULT_MIN_CONNECTIONS; + + @Override + public Http2AllocationStrategy build() { + if (minConnections > maxConnections) { + throw new IllegalArgumentException("minConnections (" + minConnections + ")" + + " must be less than or equal to maxConnections (" + maxConnections + ")"); + } + return new Http2AllocationStrategy(this); + } + + @Override + public Builder maxConcurrentStreams(long maxConcurrentStreams) { + if (maxConcurrentStreams < -1) { + throw new IllegalArgumentException("maxConcurrentStreams must be greater than or equal to -1"); + } + this.maxConcurrentStreams = maxConcurrentStreams; + return this; + } + + @Override + public Builder maxConnections(int maxConnections) { + if (maxConnections < 1) { + throw new IllegalArgumentException("maxConnections must be strictly positive"); + } + this.maxConnections = maxConnections; + return this; + } + + @Override + public Builder minConnections(int minConnections) { + if (minConnections < 0) { + throw new IllegalArgumentException("minConnections must be positive or zero"); + } + this.minConnections = minConnections; + return this; + } + } +} diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index cf76c73902..b80d44b553 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -16,13 +16,13 @@ package reactor.netty.http.client; import io.netty.channel.Channel; -import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.codec.http2.Http2LocalFlowController; import io.netty.handler.codec.http2.Http2StreamChannel; +import io.netty.handler.codec.http2.Http2StreamChannelBootstrap; import io.netty.handler.ssl.ApplicationProtocolNames; -import io.netty.handler.ssl.SslHandler; import io.netty.resolver.AddressResolverGroup; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; @@ -37,7 +37,6 @@ import reactor.core.publisher.Operators; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; -import reactor.netty.NettyPipeline; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.channel.ChannelOperations; import reactor.netty.resources.ConnectionProvider; @@ -76,6 +75,9 @@ final class Http2ConnectionProvider extends PooledConnectionProvider Http2ConnectionProvider(ConnectionProvider parent) { super(initConfiguration(parent)); this.parent = parent; + if (parent instanceof PooledConnectionProvider) { + ((PooledConnectionProvider) parent).onDispose(disposeLater()); + } } static Builder initConfiguration(ConnectionProvider parent) { @@ -299,8 +301,7 @@ else if (p.state != null) { return; } - HttpClientConfig.openStream(channel, this, obs, opsFactory, acceptGzip, metricsRecorder, uriTagValue) - .addListener(this); + http2StreamChannelBootstrap(channel).open().addListener(this); } @Override @@ -332,11 +333,12 @@ public void onUncaughtException(Connection connection, Throwable error) { @Override public void operationComplete(Future future) { Channel channel = pooledRef.poolable().channel(); - Http2FrameCodec frameCodec = channel.pipeline().get(Http2FrameCodec.class); + ChannelHandlerContext frameCodec = ((Http2Pool.Http2PooledRef) pooledRef).slot.http2FrameCodecCtx(); if (future.isSuccess()) { Http2StreamChannel ch = future.getNow(); - if (!channel.isActive() || frameCodec == null || !frameCodec.connection().local().canOpenStream()) { + if (!channel.isActive() || frameCodec == null || + !((Http2FrameCodec) frameCodec.handler()).connection().local().canOpenStream()) { invalidate(this); if (!retried) { if (log.isDebugEnabled()) { @@ -352,14 +354,18 @@ public void operationComplete(Future future) { } } else { + Http2ConnectionProvider.registerClose(ch, this); + HttpClientConfig.addStreamHandlers(ch, obs.then(new HttpClientConfig.StreamConnectionObserver(currentContext())), + opsFactory, acceptGzip, metricsRecorder, -1, uriTagValue); + ChannelOperations ops = ChannelOperations.get(ch); if (ops != null) { obs.onStateChange(ops, STREAM_CONFIGURED); sink.success(ops); } - Http2Connection.Endpoint localEndpoint = frameCodec.connection().local(); if (log.isDebugEnabled()) { + Http2Connection.Endpoint localEndpoint = ((Http2FrameCodec) frameCodec.handler()).connection().local(); logStreamsState(ch, localEndpoint, "Stream opened"); } } @@ -372,8 +378,8 @@ public void operationComplete(Future future) { boolean isH2cUpgrade() { Channel channel = pooledRef.poolable().channel(); - if (channel.pipeline().get(NettyPipeline.H2CUpgradeHandler) != null && - channel.pipeline().get(NettyPipeline.H2MultiplexHandler) == null) { + if (((Http2Pool.Http2PooledRef) pooledRef).slot.h2cUpgradeHandlerCtx() != null && + ((Http2Pool.Http2PooledRef) pooledRef).slot.http2MultiplexHandlerCtx() == null) { ChannelOperations ops = ChannelOperations.get(channel); if (ops != null) { sink.success(ops); @@ -385,11 +391,9 @@ boolean isH2cUpgrade() { boolean notHttp2() { Channel channel = pooledRef.poolable().channel(); - ChannelPipeline pipeline = channel.pipeline(); - SslHandler handler = pipeline.get(SslHandler.class); - if (handler != null) { - String protocol = handler.applicationProtocol() != null ? handler.applicationProtocol() : ApplicationProtocolNames.HTTP_1_1; - if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { + String applicationProtocol = ((Http2Pool.Http2PooledRef) pooledRef).slot.applicationProtocol; + if (applicationProtocol != null) { + if (ApplicationProtocolNames.HTTP_1_1.equals(applicationProtocol)) { // No information for the negotiated application-level protocol, // or it is HTTP/1.1, continue as an HTTP/1.1 request // and remove the connection from this pool. @@ -400,15 +404,15 @@ boolean notHttp2() { return true; } } - else if (!ApplicationProtocolNames.HTTP_2.equals(handler.applicationProtocol())) { + else if (!ApplicationProtocolNames.HTTP_2.equals(applicationProtocol)) { channel.attr(OWNER).set(null); invalidate(this); - sink.error(new IOException("Unknown protocol [" + protocol + "].")); + sink.error(new IOException("Unknown protocol [" + applicationProtocol + "].")); return true; } } - else if (pipeline.get(NettyPipeline.H2CUpgradeHandler) == null && - pipeline.get(NettyPipeline.H2MultiplexHandler) == null) { + else if (((Http2Pool.Http2PooledRef) pooledRef).slot.h2cUpgradeHandlerCtx() == null && + ((Http2Pool.Http2PooledRef) pooledRef).slot.http2MultiplexHandlerCtx() == null) { // It is not H2. There are no handlers for H2C upgrade/H2C prior-knowledge, // continue as an HTTP/1.1 request and remove the connection from this pool. ChannelOperations ops = ChannelOperations.get(channel); @@ -420,6 +424,27 @@ else if (pipeline.get(NettyPipeline.H2CUpgradeHandler) == null && } return false; } + + static final AttributeKey HTTP2_STREAM_CHANNEL_BOOTSTRAP = + AttributeKey.valueOf("http2StreamChannelBootstrap"); + + static Http2StreamChannelBootstrap http2StreamChannelBootstrap(Channel channel) { + Http2StreamChannelBootstrap http2StreamChannelBootstrap; + + for (;;) { + http2StreamChannelBootstrap = channel.attr(HTTP2_STREAM_CHANNEL_BOOTSTRAP).get(); + if (http2StreamChannelBootstrap == null) { + http2StreamChannelBootstrap = new Http2StreamChannelBootstrap(channel); + } + else { + return http2StreamChannelBootstrap; + } + if (channel.attr(HTTP2_STREAM_CHANNEL_BOOTSTRAP) + .compareAndSet(null, http2StreamChannelBootstrap)) { + return http2StreamChannelBootstrap; + } + } + } } static final class PendingConnectionObserver implements ConnectionObserver { @@ -466,7 +491,7 @@ static final class PooledConnectionAllocator { this.remoteAddress = remoteAddress; this.resolver = resolver; this.pool = poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, - poolConFig -> new Http2Pool(poolConFig, poolFactory.maxLifeTime())); + poolConFig -> new Http2Pool(poolConFig, poolFactory.allocationStrategy(), poolFactory.maxIdleTime(), poolFactory.maxLifeTime())); } Publisher connectChannel() { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index a86dd87f3b..a1472c1c74 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -17,16 +17,23 @@ import java.time.Clock; import java.time.Duration; +import java.util.Iterator; import java.util.Objects; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Function; +import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.SslHandler; +import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Disposable; @@ -36,8 +43,8 @@ import reactor.core.publisher.Operators; import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; -import reactor.netty.ConnectionObserver; -import reactor.netty.channel.ChannelOperations; +import reactor.netty.FutureMono; +import reactor.netty.NettyPipeline; import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; import reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException; import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException; @@ -45,6 +52,7 @@ import reactor.netty.internal.shaded.reactor.pool.PoolShutdownException; import reactor.netty.internal.shaded.reactor.pool.PooledRef; import reactor.netty.internal.shaded.reactor.pool.PooledRefMetadata; +import reactor.netty.resources.ConnectionProvider; import reactor.util.Logger; import reactor.util.Loggers; import reactor.util.annotation.Nullable; @@ -59,7 +67,7 @@ *

    *
  • The connection is closed.
  • *
  • The connection has reached its life time and there are no active streams.
  • - *
  • The connection has no active streams.
  • + *
  • The connection has reached its idle time and there are no active streams.
  • *
  • When the client is in one of the two modes: 1) H2 and HTTP/1.1 or 2) H2C and HTTP/1.1, * and the negotiated protocol is HTTP/1.1.
  • *
@@ -75,17 +83,18 @@ *

* This pool always invalidate the {@link PooledRef}, there is no release functionality. *

    - *
  • {@link PoolMetrics#acquiredSize()} and {@link PoolMetrics#allocatedSize()} always return the number of - * the active streams from all connections currently in the pool.
  • - *
  • {@link PoolMetrics#idleSize()} always returns {@code 0}.
  • + *
  • {@link PoolMetrics#acquiredSize()}, {@link PoolMetrics#allocatedSize()} and {@link PoolMetrics#idleSize()} + * always return the number of the cached connections.
  • + *
  • {@link Http2Pool#activeStreams()} always return the active streams from all connections currently in the pool.
  • *
*

+ * If minimum connections is specified, the cached connections with active streams will be kept at that minimum + * (can be the best effort). However, if the cached connections have reached max concurrent streams, + * then new connections will be allocated up to the maximum connections limit. + *

* Configurations that are not applicable *

    *
  • {@link PoolConfig#destroyHandler()} - the destroy handler cannot be used as the destruction is more complex.
  • - *
  • {@link PoolConfig#evictInBackgroundInterval()} and {@link PoolConfig#evictInBackgroundScheduler()} - - * there are no idle resources in the pool. Once the connection does not have active streams, it - * is returned to the parent pool.
  • *
  • {@link PoolConfig#evictionPredicate()} - the eviction predicate cannot be used as more complex * checks have to be done. Also the pool uses filtering for the connections (a connection might not be able * to be used but is required to stay in the pool).
  • @@ -94,7 +103,6 @@ *
  • {@link PoolConfig#reuseIdleResourcesInLruOrder()} - FIFO is used when checking the connections.
  • *
  • FIFO is used when obtaining the pending borrowers
  • *
  • Warm up functionality is not supported
  • - *
  • Setting minimum connections configuration is not supported
  • *
*

This class is based on * https://github.com/reactor/reactor-pool/blob/v0.2.7/src/main/java/reactor/pool/SimpleDequePool.java @@ -114,6 +122,10 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool. static final AtomicReferenceFieldUpdater CONNECTIONS = AtomicReferenceFieldUpdater.newUpdater(Http2Pool.class, ConcurrentLinkedQueue.class, "connections"); + volatile int idleSize; + private static final AtomicIntegerFieldUpdater IDLE_SIZE = + AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "idleSize"); + /** * Pending borrowers queue. Never invoke directly the poll/add/remove methods and instead of that, * use addPending/pollPending/removePending methods which take care of maintaining the pending queue size. @@ -135,28 +147,41 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool. @SuppressWarnings("rawtypes") static final ConcurrentLinkedDeque TERMINATED = new ConcurrentLinkedDeque(); + volatile long totalMaxConcurrentStreams; + static final AtomicLongFieldUpdater TOTAL_MAX_CONCURRENT_STREAMS = + AtomicLongFieldUpdater.newUpdater(Http2Pool.class, "totalMaxConcurrentStreams"); + volatile int wip; static final AtomicIntegerFieldUpdater WIP = AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "wip"); final Clock clock; + final Long maxConcurrentStreams; + final long maxIdleTime; final long maxLifeTime; + final int minConnections; final PoolConfig poolConfig; long lastInteractionTimestamp; - Http2Pool(PoolConfig poolConfig, long maxLifeTime) { - if (poolConfig.allocationStrategy().getPermits(0) != 0) { - throw new IllegalArgumentException("No support for configuring minimum number of connections"); - } + Disposable evictionTask; + + Http2Pool(PoolConfig poolConfig, @Nullable ConnectionProvider.AllocationStrategy allocationStrategy, + long maxIdleTime, long maxLifeTime) { this.clock = poolConfig.clock(); this.connections = new ConcurrentLinkedQueue<>(); this.lastInteractionTimestamp = clock.millis(); + this.maxConcurrentStreams = allocationStrategy instanceof Http2AllocationStrategy ? + ((Http2AllocationStrategy) allocationStrategy).maxConcurrentStreams() : -1; + this.maxIdleTime = maxIdleTime; this.maxLifeTime = maxLifeTime; + this.minConnections = allocationStrategy == null ? 0 : allocationStrategy.permitMinimum(); this.pending = new ConcurrentLinkedDeque<>(); this.poolConfig = poolConfig; recordInteractionTimestamp(); + + scheduleEviction(); } @Override @@ -171,12 +196,12 @@ public Mono> acquire(Duration timeout) { @Override public int acquiredSize() { - return acquired; + return allocatedSize() - idleSize(); } @Override public int allocatedSize() { - return acquired; + return poolConfig.allocationStrategy().permitGranted(); } @Override @@ -192,15 +217,26 @@ public Mono disposeLater() { @SuppressWarnings("unchecked") ConcurrentLinkedDeque q = PENDING.getAndSet(this, TERMINATED); if (q != TERMINATED) { + evictionTask.dispose(); + Borrower p; while ((p = pollPending(q, true)) != null) { p.fail(new PoolShutdownException()); } - // the last stream on that connection will release the connection to the parent pool - // the structure should not contain connections with 0 streams as the last stream on that connection - // always removes the connection from this pool - CONNECTIONS.getAndSet(this, null); + @SuppressWarnings("unchecked") + ConcurrentLinkedQueue slots = CONNECTIONS.getAndSet(this, null); + if (slots != null) { + Mono closeMonos = Mono.empty(); + while (!slots.isEmpty()) { + Slot slot = pollSlot(slots); + if (slot != null) { + slot.invalidate(); + closeMonos = closeMonos.and(DEFAULT_DESTROY_HANDLER.apply(slot.connection)); + } + } + return closeMonos; + } } return Mono.empty(); }); @@ -218,7 +254,7 @@ public int getMaxPendingAcquireSize() { @Override public int idleSize() { - return 0; + return idleSize; } @Override @@ -253,6 +289,10 @@ public Mono warmup() { return Mono.just(0); } + int activeStreams() { + return acquired; + } + void cancelAcquire(Borrower borrower) { if (!isDisposed()) { ConcurrentLinkedDeque q = pending; @@ -260,15 +300,32 @@ void cancelAcquire(Borrower borrower) { } } + @SuppressWarnings("FutureReturnValueIgnored") Mono destroyPoolable(Http2PooledRef ref) { + assert ref.slot.connection.channel().eventLoop().inEventLoop(); Mono mono = Mono.empty(); try { + // By default, check the connection for removal on acquire and invalidate (only if there are no active streams) if (ref.slot.decrementConcurrencyAndGet() == 0) { - ref.slot.invalidate(); - Connection connection = ref.poolable(); - Http2FrameCodec frameCodec = connection.channel().pipeline().get(Http2FrameCodec.class); - if (frameCodec != null) { - releaseConnection(connection); + // not HTTP/2 request + if (ref.slot.http2FrameCodecCtx() == null) { + ref.slot.invalidate(); + removeSlot(ref.slot); + } + // If there is eviction in background, the background process will remove this connection + else if (poolConfig.evictInBackgroundInterval().isZero()) { + // not active + if (!ref.poolable().channel().isActive()) { + ref.slot.invalidate(); + removeSlot(ref.slot); + } + // max life reached + else if (maxLifeReached(ref.slot)) { + //"FutureReturnValueIgnored" this is deliberate + ref.slot.connection.channel().close(); + ref.slot.invalidate(); + removeSlot(ref.slot); + } } } } @@ -311,81 +368,89 @@ void drainLoop() { if (borrowersCount != 0) { // find a connection that can be used for opening a new stream - Slot slot = findConnection(resources); + // when cached connections are below minimum connections, then allocate a new connection + boolean belowMinConnections = minConnections > 0 && + poolConfig.allocationStrategy().permitGranted() < minConnections; + Slot slot = belowMinConnections ? null : findConnection(resources); if (slot != null) { Borrower borrower = pollPending(borrowers, true); if (borrower == null) { - resources.offer(slot); + offerSlot(resources, slot); continue; } if (isDisposed()) { borrower.fail(new PoolShutdownException()); return; } - if (slot.incrementConcurrencyAndGet() > 1) { - borrower.stopPendingCountdown(); - if (log.isDebugEnabled()) { - log.debug(format(slot.connection.channel(), "Channel activated")); - } - ACQUIRED.incrementAndGet(this); - // we are ready here, the connection can be used for opening another stream - slot.deactivate(); - poolConfig.acquisitionScheduler().schedule(() -> borrower.deliver(new Http2PooledRef(slot))); - } - else { - addPending(borrowers, borrower, true); - continue; + borrower.stopPendingCountdown(); + if (log.isDebugEnabled()) { + log.debug(format(slot.connection.channel(), "Channel activated")); } + ACQUIRED.incrementAndGet(this); + slot.connection.channel().eventLoop().execute(() -> { + borrower.deliver(new Http2PooledRef(slot)); + drain(); + }); } else { - int permits = poolConfig.allocationStrategy().getPermits(1); - if (permits <= 0) { - if (maxPending >= 0) { - borrowersCount = pendingSize; - int toCull = borrowersCount - maxPending; - for (int i = 0; i < toCull; i++) { - Borrower extraneous = pollPending(borrowers, true); - if (extraneous != null) { - pendingAcquireLimitReached(extraneous, maxPending); - } - } - } + int resourcesCount = idleSize; + if (minConnections > 0 && + poolConfig.allocationStrategy().permitGranted() >= minConnections && + resourcesCount == 0) { + // connections allocations were triggered } else { - Borrower borrower = pollPending(borrowers, true); - if (borrower == null) { - continue; + int permits = poolConfig.allocationStrategy().getPermits(1); + if (permits <= 0) { + if (maxPending >= 0) { + borrowersCount = pendingSize; + int toCull = borrowersCount - maxPending; + for (int i = 0; i < toCull; i++) { + Borrower extraneous = pollPending(borrowers, true); + if (extraneous != null) { + pendingAcquireLimitReached(extraneous, maxPending); + } + } + } } - if (isDisposed()) { - borrower.fail(new PoolShutdownException()); - return; + else { + if (permits > 1) { + // warmup is not supported + poolConfig.allocationStrategy().returnPermits(permits - 1); + } + Borrower borrower = pollPending(borrowers, true); + if (borrower == null) { + continue; + } + if (isDisposed()) { + borrower.fail(new PoolShutdownException()); + return; + } + borrower.stopPendingCountdown(); + Mono allocator = poolConfig.allocator(); + Mono primary = + allocator.doOnEach(sig -> { + if (sig.isOnNext()) { + Connection newInstance = sig.get(); + assert newInstance != null; + Slot newSlot = new Slot(this, newInstance); + if (log.isDebugEnabled()) { + log.debug(format(newInstance.channel(), "Channel activated")); + } + ACQUIRED.incrementAndGet(this); + borrower.deliver(new Http2PooledRef(newSlot)); + } + else if (sig.isOnError()) { + Throwable error = sig.getThrowable(); + assert error != null; + poolConfig.allocationStrategy().returnPermits(1); + borrower.fail(error); + } + }) + .contextWrite(borrower.currentContext()); + + primary.subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> drain(), this::drain); } - borrower.stopPendingCountdown(); - Mono allocator = poolConfig.allocator(); - Mono primary = - allocator.doOnEach(sig -> { - if (sig.isOnNext()) { - Connection newInstance = sig.get(); - assert newInstance != null; - Slot newSlot = new Slot(this, newInstance); - if (log.isDebugEnabled()) { - log.debug(format(newInstance.channel(), "Channel activated")); - } - ACQUIRED.incrementAndGet(this); - newSlot.incrementConcurrencyAndGet(); - newSlot.deactivate(); - borrower.deliver(new Http2PooledRef(newSlot)); - } - else if (sig.isOnError()) { - Throwable error = sig.getThrowable(); - assert error != null; - poolConfig.allocationStrategy().returnPermits(1); - borrower.fail(error); - } - }) - .contextWrite(borrower.currentContext()); - - primary.subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> drain(), this::drain); } } } @@ -397,16 +462,78 @@ else if (sig.isOnError()) { } } + @SuppressWarnings("FutureReturnValueIgnored") + void evictInBackground() { + @SuppressWarnings("unchecked") + ConcurrentLinkedQueue resources = CONNECTIONS.get(this); + if (resources == null) { + //no need to schedule the task again, pool has been disposed + return; + } + + if (WIP.getAndIncrement(this) == 0) { + if (pendingSize == 0) { + Iterator slots = resources.iterator(); + while (slots.hasNext()) { + Slot slot = slots.next(); + if (slot.concurrency() == 0) { + if (!slot.connection.channel().isActive()) { + if (log.isDebugEnabled()) { + log.debug(format(slot.connection.channel(), "Channel is closed, remove from pool")); + } + recordInteractionTimestamp(); + slots.remove(); + IDLE_SIZE.decrementAndGet(this); + slot.invalidate(); + continue; + } + + if (maxLifeReached(slot)) { + if (log.isDebugEnabled()) { + log.debug(format(slot.connection.channel(), "Max life time is reached, remove from pool")); + } + //"FutureReturnValueIgnored" this is deliberate + slot.connection.channel().close(); + recordInteractionTimestamp(); + slots.remove(); + IDLE_SIZE.decrementAndGet(this); + slot.invalidate(); + continue; + } + } + if (maxIdleReached(slot)) { + if (log.isDebugEnabled()) { + log.debug(format(slot.connection.channel(), "Idle time is reached, remove from pool")); + } + //"FutureReturnValueIgnored" this is deliberate + slot.connection.channel().close(); + recordInteractionTimestamp(); + slots.remove(); + IDLE_SIZE.decrementAndGet(this); + slot.invalidate(); + } + } + } + //at the end if there are racing drain calls, go into the drainLoop + if (WIP.decrementAndGet(this) > 0) { + drainLoop(); + } + } + //schedule the next iteration + scheduleEviction(); + } + @Nullable + @SuppressWarnings("FutureReturnValueIgnored") Slot findConnection(ConcurrentLinkedQueue resources) { - int resourcesCount = resources.size(); + int resourcesCount = idleSize; while (resourcesCount > 0) { // There are connections in the queue resourcesCount--; // get the connection - Slot slot = resources.poll(); + Slot slot = pollSlot(resources); if (slot == null) { continue; } @@ -418,38 +545,51 @@ Slot findConnection(ConcurrentLinkedQueue resources) { log.debug(format(slot.connection.channel(), "Channel is closed, {} active streams"), slot.concurrency()); } - resources.offer(slot); + offerSlot(resources, slot); } else { if (log.isDebugEnabled()) { log.debug(format(slot.connection.channel(), "Channel is closed, remove from pool")); } - resources.remove(slot); + slot.invalidate(); } continue; } - // check that the connection's max lifetime has not been reached - if (maxLifeTime != -1 && slot.lifeTime() >= maxLifeTime) { + // check whether the connection's idle time has been reached + if (maxIdleReached(slot)) { + if (log.isDebugEnabled()) { + log.debug(format(slot.connection.channel(), "Idle time is reached, remove from pool")); + } + //"FutureReturnValueIgnored" this is deliberate + slot.connection.channel().close(); + slot.invalidate(); + continue; + } + + // check whether the connection's max lifetime has been reached + if (maxLifeReached(slot)) { if (slot.concurrency() > 0) { if (log.isDebugEnabled()) { log.debug(format(slot.connection.channel(), "Max life time is reached, {} active streams"), slot.concurrency()); } - resources.offer(slot); + offerSlot(resources, slot); } else { if (log.isDebugEnabled()) { log.debug(format(slot.connection.channel(), "Max life time is reached, remove from pool")); } - resources.remove(slot); + //"FutureReturnValueIgnored" this is deliberate + slot.connection.channel().close(); + slot.invalidate(); } continue; } // check that the connection's max active streams has not been reached if (!slot.canOpenStream()) { - resources.offer(slot); + offerSlot(resources, slot); if (log.isDebugEnabled()) { log.debug(format(slot.connection.channel(), "Max active streams is reached")); } @@ -462,6 +602,14 @@ Slot findConnection(ConcurrentLinkedQueue resources) { return null; } + boolean maxIdleReached(Slot slot) { + return maxIdleTime != -1 && slot.idleTime() >= maxIdleTime; + } + + boolean maxLifeReached(Slot slot) { + return maxLifeTime != -1 && slot.lifeTime() >= maxLifeTime; + } + void pendingAcquireLimitReached(Borrower borrower, int maxPending) { if (maxPending == 0) { borrower.fail(new PoolAcquirePendingLimitException(0, @@ -530,33 +678,51 @@ int addPending(ConcurrentLinkedDeque borrowers, Borrower borrower, boo return PENDING_SIZE.incrementAndGet(this); } - static boolean offerSlot(Slot slot) { - @SuppressWarnings("unchecked") - ConcurrentLinkedQueue q = CONNECTIONS.get(slot.pool); - return q != null && q.offer(slot); + void offerSlot(@Nullable ConcurrentLinkedQueue slots, Slot slot) { + if (slots != null && slots.offer(slot)) { + IDLE_SIZE.incrementAndGet(this); + } } - static void releaseConnection(Connection connection) { - ChannelOperations ops = connection.as(ChannelOperations.class); - if (ops != null) { - ops.listener().onStateChange(ops, ConnectionObserver.State.DISCONNECTING); - } - else if (connection instanceof ConnectionObserver) { - ((ConnectionObserver) connection).onStateChange(connection, ConnectionObserver.State.DISCONNECTING); + @Nullable + Slot pollSlot(@Nullable ConcurrentLinkedQueue slots) { + if (slots == null) { + return null; } - else { - connection.dispose(); + Slot slot = slots.poll(); + if (slot != null) { + IDLE_SIZE.decrementAndGet(this); } + return slot; } - static void removeSlot(Slot slot) { + void removeSlot(Slot slot) { @SuppressWarnings("unchecked") ConcurrentLinkedQueue q = CONNECTIONS.get(slot.pool); - if (q != null) { - q.remove(slot); + if (q != null && q.remove(slot)) { + IDLE_SIZE.decrementAndGet(this); } } + void scheduleEviction() { + if (!poolConfig.evictInBackgroundInterval().isZero()) { + long nanosEvictionInterval = poolConfig.evictInBackgroundInterval().toNanos(); + this.evictionTask = poolConfig.evictInBackgroundScheduler() + .schedule(this::evictInBackground, nanosEvictionInterval, TimeUnit.NANOSECONDS); + } + else { + this.evictionTask = Disposables.disposed(); + } + } + + static final Function> DEFAULT_DESTROY_HANDLER = + connection -> { + if (!connection.channel().isActive()) { + return Mono.empty(); + } + return FutureMono.from(connection.channel().close()); + }; + static final class Borrower extends AtomicBoolean implements Scannable, Subscription, Runnable { static final Disposable TIMEOUT_DISPOSED = Disposables.disposed(); @@ -589,7 +755,10 @@ Context currentContext() { @Override public void request(long n) { if (Operators.validate(n)) { - if (!acquireTimeout.isZero()) { + long estimateStreamsCount = pool.totalMaxConcurrentStreams - pool.acquired; + int permits = pool.poolConfig.allocationStrategy().estimatePermitCount(); + int pending = pool.pendingSize; + if (!acquireTimeout.isZero() && permits + estimateStreamsCount <= pending) { timeoutTask = Schedulers.parallel().schedule(this, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS); } pool.doAcquire(this); @@ -627,7 +796,9 @@ public String toString() { } void deliver(Http2PooledRef poolSlot) { - stopPendingCountdown(); + assert poolSlot.slot.connection.channel().eventLoop().inEventLoop(); + poolSlot.slot.incrementConcurrencyAndGet(); + poolSlot.slot.deactivate(); if (get()) { //CANCELLED or timeout reached poolSlot.invalidate().subscribe(aVoid -> {}, e -> Operators.onErrorDropped(e, Context.empty())); @@ -737,7 +908,7 @@ public String toString() { } } - static final class Slot { + static final class Slot extends AtomicBoolean { volatile int concurrency; static final AtomicIntegerFieldUpdater CONCURRENCY = @@ -746,18 +917,47 @@ static final class Slot { final Connection connection; final long creationTimestamp; final Http2Pool pool; + final String applicationProtocol; + + long idleTimestamp; + long maxConcurrentStreams; + + volatile ChannelHandlerContext http2FrameCodecCtx; + volatile ChannelHandlerContext http2MultiplexHandlerCtx; + volatile ChannelHandlerContext h2cUpgradeHandlerCtx; Slot(Http2Pool pool, Connection connection) { this.connection = connection; this.creationTimestamp = pool.clock.millis(); this.pool = pool; + ChannelHandlerContext frameCodec = http2FrameCodecCtx(); + if (frameCodec != null && http2MultiplexHandlerCtx() != null) { + this.maxConcurrentStreams = ((Http2FrameCodec) frameCodec.handler()).connection().local().maxActiveStreams(); + this.maxConcurrentStreams = pool.maxConcurrentStreams == -1 ? maxConcurrentStreams : + Math.min(pool.maxConcurrentStreams, maxConcurrentStreams); + } + TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, this.maxConcurrentStreams); + SslHandler handler = connection.channel().pipeline().get(SslHandler.class); + if (handler != null) { + this.applicationProtocol = handler.applicationProtocol() != null ? + handler.applicationProtocol() : ApplicationProtocolNames.HTTP_1_1; + } + else { + this.applicationProtocol = null; + } } boolean canOpenStream() { - Http2FrameCodec frameCodec = connection.channel().pipeline().get(Http2FrameCodec.class); - Http2MultiplexHandler multiplexHandler = connection.channel().pipeline().get(Http2MultiplexHandler.class); - if (frameCodec != null && multiplexHandler != null) { - int maxActiveStreams = frameCodec.connection().local().maxActiveStreams(); + ChannelHandlerContext frameCodec = http2FrameCodecCtx(); + if (frameCodec != null && http2MultiplexHandlerCtx() != null) { + long maxActiveStreams = ((Http2FrameCodec) frameCodec.handler()).connection().local().maxActiveStreams(); + maxActiveStreams = pool.maxConcurrentStreams == -1 ? maxActiveStreams : + Math.min(pool.maxConcurrentStreams, maxActiveStreams); + long diff = maxActiveStreams - maxConcurrentStreams; + if (diff != 0) { + maxConcurrentStreams = maxActiveStreams; + TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, diff); + } int concurrency = this.concurrency; return concurrency < maxActiveStreams; } @@ -772,23 +972,70 @@ void deactivate() { if (log.isDebugEnabled()) { log.debug(format(connection.channel(), "Channel deactivated")); } - offerSlot(this); + @SuppressWarnings("unchecked") + ConcurrentLinkedQueue slots = CONNECTIONS.get(pool); + pool.offerSlot(slots, this); } int decrementConcurrencyAndGet() { - return CONCURRENCY.decrementAndGet(this); + int concurrency = CONCURRENCY.decrementAndGet(this); + idleTimestamp = pool.clock.millis(); + return concurrency; } - int incrementConcurrencyAndGet() { - return CONCURRENCY.incrementAndGet(this); + long idleTime() { + if (concurrency() > 0) { + return 0L; + } + long idleTime = idleTimestamp != 0 ? idleTimestamp : creationTimestamp; + return pool.clock.millis() - idleTime; + } + + @Nullable + ChannelHandlerContext http2FrameCodecCtx() { + ChannelHandlerContext ctx = http2FrameCodecCtx; + if (ctx != null && !ctx.isRemoved()) { + return ctx; + } + ctx = connection.channel().pipeline().context(Http2FrameCodec.class); + http2FrameCodecCtx = ctx; + return ctx; + } + + @Nullable + ChannelHandlerContext http2MultiplexHandlerCtx() { + ChannelHandlerContext ctx = http2MultiplexHandlerCtx; + if (ctx != null && !ctx.isRemoved()) { + return ctx; + } + ctx = connection.channel().pipeline().context(Http2MultiplexHandler.class); + http2MultiplexHandlerCtx = ctx; + return ctx; + } + + @Nullable + ChannelHandlerContext h2cUpgradeHandlerCtx() { + ChannelHandlerContext ctx = h2cUpgradeHandlerCtx; + if (ctx != null && !ctx.isRemoved()) { + return ctx; + } + ctx = connection.channel().pipeline().context(NettyPipeline.H2CUpgradeHandler); + h2cUpgradeHandlerCtx = ctx; + return ctx; + } + + void incrementConcurrencyAndGet() { + CONCURRENCY.incrementAndGet(this); } void invalidate() { - if (log.isDebugEnabled()) { - log.debug(format(connection.channel(), "Channel removed from pool")); + if (compareAndSet(false, true)) { + if (log.isDebugEnabled()) { + log.debug(format(connection.channel(), "Channel removed from pool")); + } + pool.poolConfig.allocationStrategy().returnPermits(1); + TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, -maxConcurrentStreams); } - pool.poolConfig.allocationStrategy().returnPermits(1); - removeSlot(this); } long lifeTime() { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2StreamBridgeClientHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2StreamBridgeClientHandler.java index a432f55d41..f1808b573a 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2StreamBridgeClientHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2StreamBridgeClientHandler.java @@ -17,15 +17,11 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec; -import reactor.netty.Connection; -import reactor.netty.ConnectionObserver; -import reactor.netty.channel.ChannelOperations; - -import static reactor.netty.ReactorNetty.format; /** * This handler is intended to work together with {@link Http2StreamFrameToHttpObjectCodec} @@ -35,33 +31,14 @@ * @author Violeta Georgieva * @since 1.0.0 */ +@ChannelHandler.Sharable final class Http2StreamBridgeClientHandler extends ChannelDuplexHandler { - final ConnectionObserver observer; - final ChannelOperations.OnSetup opsFactory; - - Http2StreamBridgeClientHandler(ConnectionObserver listener, ChannelOperations.OnSetup opsFactory) { - this.observer = listener; - this.opsFactory = opsFactory; - } - @Override public void channelActive(ChannelHandlerContext ctx) { ctx.read(); } - @Override - public void handlerAdded(ChannelHandlerContext ctx) { - if (HttpClientOperations.log.isDebugEnabled()) { - HttpClientOperations.log.debug(format(ctx.channel(), "New HTTP/2 stream")); - } - - ChannelOperations ops = opsFactory.create(Connection.from(ctx.channel()), observer, null); - if (ops != null) { - ops.bind(); - } - } - @Override @SuppressWarnings("FutureReturnValueIgnored") public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java index 3eeceabf3d..b5eee9e349 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java @@ -52,16 +52,14 @@ import io.netty.handler.codec.http2.Http2FrameLogger; import io.netty.handler.codec.http2.Http2MultiplexHandler; import io.netty.handler.codec.http2.Http2Settings; -import io.netty.handler.codec.http2.Http2StreamChannel; -import io.netty.handler.codec.http2.Http2StreamChannelBootstrap; import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec; +import io.netty.handler.flush.FlushConsolidationHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.resolver.AddressResolverGroup; -import io.netty.util.concurrent.Future; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import reactor.netty.ChannelPipelineConfigurer; @@ -504,6 +502,65 @@ Http2Settings http2Settings() { return settings; } + static void addStreamHandlers( + Channel ch, + ConnectionObserver obs, + ChannelOperations.OnSetup opsFactory, + boolean acceptGzip, + @Nullable ChannelMetricsRecorder metricsRecorder, + long responseTimeoutMillis, + @Nullable Function uriTagValue) { + + if (HttpClientOperations.log.isDebugEnabled()) { + HttpClientOperations.log.debug(format(ch, "New HTTP/2 stream")); + } + + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(NettyPipeline.H2ToHttp11Codec, HTTP2_STREAM_FRAME_TO_HTTP_OBJECT) + .addLast(NettyPipeline.HttpTrafficHandler, HTTP_2_STREAM_BRIDGE_CLIENT_HANDLER); + + if (acceptGzip) { + pipeline.addLast(NettyPipeline.HttpDecompressor, new HttpContentDecompressor()); + } + + ChannelOperations.addReactiveBridge(ch, opsFactory, obs); + + if (metricsRecorder != null) { + if (metricsRecorder instanceof HttpClientMetricsRecorder) { + ChannelHandler handler; + Channel parent = ch.parent(); + ChannelHandler existingHandler = parent.pipeline().get(NettyPipeline.HttpMetricsHandler); + if (existingHandler != null) { + // This use case can happen only in HTTP/2 clear text connection upgrade + parent.pipeline().remove(NettyPipeline.HttpMetricsHandler); + handler = metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder ? + new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsHandler) existingHandler) : + new HttpClientMetricsHandler((HttpClientMetricsHandler) existingHandler); + } + else { + handler = metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder ? + new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsRecorder) metricsRecorder, uriTagValue) : + new HttpClientMetricsHandler((HttpClientMetricsRecorder) metricsRecorder, uriTagValue); + } + pipeline.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpMetricsHandler, handler); + } + } + + if (responseTimeoutMillis > -1) { + Connection.from(ch).addHandlerFirst(NettyPipeline.ResponseTimeoutHandler, + new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS)); + } + + if (log.isDebugEnabled()) { + log.debug(format(ch, "Initialized HTTP/2 stream pipeline {}"), ch.pipeline()); + } + + ChannelOperations ops = opsFactory.create(Connection.from(ch), obs, null); + if (ops != null) { + ops.bind(); + } + } + static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, HttpResponseDecoderSpec decoder, Http2Settings http2Settings, ConnectionObserver observer) { Http2FrameCodecBuilder http2FrameCodecBuilder = @@ -516,7 +573,8 @@ static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, HttpRe "reactor.netty.http.client.h2")); } - p.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpCodec, http2FrameCodecBuilder.build()) + p.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2Flush, new FlushConsolidationHandler(1024, true)) + .addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpCodec, http2FrameCodecBuilder.build()) .addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2MultiplexHandler, new Http2MultiplexHandler(new H2Codec(acceptGzip))) .addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpTrafficHandler, new HttpTrafficHandler(observer)); } @@ -608,20 +666,6 @@ static void configureHttp11Pipeline(ChannelPipeline p, } } - static Future openStream( - Channel channel, - Http2ConnectionProvider.DisposableAcquire owner, - ConnectionObserver observer, - ChannelOperations.OnSetup opsFactory, - boolean acceptGzip, - @Nullable ChannelMetricsRecorder metricsRecorder, - @Nullable Function uriTagValue) { - Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(channel); - bootstrap.option(ChannelOption.AUTO_READ, false); - bootstrap.handler(new H2Codec(owner, observer, opsFactory, acceptGzip, metricsRecorder, uriTagValue)); - return bootstrap.open(); - } - static final Pattern FOLLOW_REDIRECT_CODES = Pattern.compile("30[12378]"); static final BiPredicate FOLLOW_REDIRECT_PREDICATE = @@ -639,6 +683,12 @@ static Future openStream( static final int h11orH2C = h11 | h2c; + static final Http2StreamFrameToHttpObjectCodec HTTP2_STREAM_FRAME_TO_HTTP_OBJECT = + new Http2StreamFrameToHttpObjectCodec(false); + + static final Http2StreamBridgeClientHandler HTTP_2_STREAM_BRIDGE_CLIENT_HANDLER = + new Http2StreamBridgeClientHandler(); + static final Logger log = Loggers.getLogger(HttpClientConfig.class); static final LoggingHandler LOGGING_HANDLER = @@ -706,6 +756,7 @@ public void handlerAdded(ChannelHandlerContext ctx) { } static final class H2Codec extends ChannelInitializer { + final boolean acceptGzip; final ChannelMetricsRecorder metricsRecorder; final ConnectionObserver observer; @@ -759,55 +810,14 @@ static final class H2Codec extends ChannelInitializer { protected void initChannel(Channel ch) { if (observer != null && opsFactory != null && owner != null) { Http2ConnectionProvider.registerClose(ch, owner); - addStreamHandlers(ch, observer.then(new StreamConnectionObserver(owner.currentContext())), opsFactory); + addStreamHandlers(ch, observer.then(new StreamConnectionObserver(owner.currentContext())), opsFactory, + acceptGzip, metricsRecorder, responseTimeoutMillis, uriTagValue); } else { // Handle server pushes (inbound streams) // TODO this is not supported } } - - void addStreamHandlers(Channel ch, ConnectionObserver obs, ChannelOperations.OnSetup opsFactory) { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(NettyPipeline.H2ToHttp11Codec, new Http2StreamFrameToHttpObjectCodec(false)) - .addLast(NettyPipeline.HttpTrafficHandler, new Http2StreamBridgeClientHandler(obs, opsFactory)); - - if (acceptGzip) { - pipeline.addLast(NettyPipeline.HttpDecompressor, new HttpContentDecompressor()); - } - - ChannelOperations.addReactiveBridge(ch, opsFactory, obs); - - if (metricsRecorder != null) { - if (metricsRecorder instanceof HttpClientMetricsRecorder) { - ChannelHandler handler; - Channel parent = ch.parent(); - ChannelHandler existingHandler = parent.pipeline().get(NettyPipeline.HttpMetricsHandler); - if (existingHandler != null) { - // This use case can happen only in HTTP/2 clear text connection upgrade - parent.pipeline().remove(NettyPipeline.HttpMetricsHandler); - handler = metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder ? - new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsHandler) existingHandler) : - new HttpClientMetricsHandler((HttpClientMetricsHandler) existingHandler); - } - else { - handler = metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder ? - new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsRecorder) metricsRecorder, uriTagValue) : - new HttpClientMetricsHandler((HttpClientMetricsRecorder) metricsRecorder, uriTagValue); - } - pipeline.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpMetricsHandler, handler); - } - } - - if (responseTimeoutMillis > -1) { - Connection.from(ch).addHandlerFirst(NettyPipeline.ResponseTimeoutHandler, - new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS)); - } - - if (log.isDebugEnabled()) { - log.debug(format(ch, "Initialized HTTP/2 stream pipeline {}"), ch.pipeline()); - } - } } static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java index 08bd8ee6af..3543fe1608 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java @@ -22,16 +22,21 @@ import java.net.SocketAddress; +import static reactor.netty.Metrics.ACTIVE_CONNECTIONS; import static reactor.netty.Metrics.ACTIVE_STREAMS; import static reactor.netty.Metrics.CONNECTION_PROVIDER_PREFIX; import static reactor.netty.Metrics.ID; +import static reactor.netty.Metrics.IDLE_CONNECTIONS; import static reactor.netty.Metrics.NAME; import static reactor.netty.Metrics.PENDING_STREAMS; import static reactor.netty.Metrics.REGISTRY; import static reactor.netty.Metrics.REMOTE_ADDRESS; final class MicrometerHttp2ConnectionProviderMeterRegistrar { + static final String ACTIVE_CONNECTIONS_DESCRIPTION = + "The number of the connections that have been successfully acquired and are in active use"; static final String ACTIVE_STREAMS_DESCRIPTION = "The number of the active HTTP/2 streams"; + static final String IDLE_CONNECTIONS_DESCRIPTION = "The number of the idle connections"; static final String PENDING_STREAMS_DESCRIPTION = "The number of requests that are waiting for opening HTTP/2 stream"; @@ -45,11 +50,21 @@ void registerMetrics(String poolName, String id, SocketAddress remoteAddress, In String addressAsString = Metrics.formatSocketAddress(remoteAddress); Tags tags = Tags.of(ID, id, REMOTE_ADDRESS, addressAsString, NAME, poolName); - Gauge.builder(CONNECTION_PROVIDER_PREFIX + ACTIVE_STREAMS, metrics, InstrumentedPool.PoolMetrics::acquiredSize) + Gauge.builder(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, metrics, InstrumentedPool.PoolMetrics::acquiredSize) + .description(ACTIVE_CONNECTIONS_DESCRIPTION) + .tags(tags) + .register(REGISTRY); + + Gauge.builder(CONNECTION_PROVIDER_PREFIX + ACTIVE_STREAMS, metrics, poolMetrics -> ((Http2Pool) poolMetrics).activeStreams()) .description(ACTIVE_STREAMS_DESCRIPTION) .tags(tags) .register(REGISTRY); + Gauge.builder(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, metrics, InstrumentedPool.PoolMetrics::idleSize) + .description(IDLE_CONNECTIONS_DESCRIPTION) + .tags(tags) + .register(REGISTRY); + Gauge.builder(CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, metrics, InstrumentedPool.PoolMetrics::pendingAcquireSize) .description(PENDING_STREAMS_DESCRIPTION) .tags(tags) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java index 6cda411430..9e05323be3 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java @@ -414,7 +414,7 @@ static void addStreamHandlers(Channel ch, if (accessLogEnabled) { pipeline.addLast(NettyPipeline.AccessLogHandler, AccessLogHandlerFactory.H2.create(accessLog)); } - pipeline.addLast(NettyPipeline.H2ToHttp11Codec, new Http2StreamFrameToHttpObjectCodec(true)) + pipeline.addLast(NettyPipeline.H2ToHttp11Codec, HTTP2_STREAM_FRAME_TO_HTTP_OBJECT) .addLast(NettyPipeline.HttpTrafficHandler, new Http2StreamBridgeServerHandler(compressPredicate, decoder, encoder, formDecoderProvider, forwardedHeaderHandler, listener, mapHandle)); @@ -664,6 +664,9 @@ static void configureHttp11Pipeline(ChannelPipeline p, static final int h11orH2C = h11 | h2c; + static final Http2StreamFrameToHttpObjectCodec HTTP2_STREAM_FRAME_TO_HTTP_OBJECT = + new Http2StreamFrameToHttpObjectCodec(true); + static final Logger log = Loggers.getLogger(HttpServerConfig.class); static final LoggingHandler LOGGING_HANDLER = diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java index 22d6210df9..328715fbe0 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java @@ -407,7 +407,7 @@ private void doTestResponseTimeout(HttpClient client, long expectedTimeout) timeout.set(((ReadTimeoutHandler) handler).getReaderIdleTimeInMillis()); } }) - .doOnDisconnected(conn -> onDisconnected.set(handlerAvailable.test(conn))); + .doOnDisconnected(conn -> onDisconnected.set(conn.channel().isActive() && handlerAvailable.test(conn))); Mono response = localClient.get() diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java new file mode 100644 index 0000000000..d708754cf2 --- /dev/null +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.client; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static reactor.netty.http.client.Http2AllocationStrategy.Build.DEFAULT_MAX_CONCURRENT_STREAMS; +import static reactor.netty.http.client.Http2AllocationStrategy.Build.DEFAULT_MAX_CONNECTIONS; +import static reactor.netty.http.client.Http2AllocationStrategy.Build.DEFAULT_MIN_CONNECTIONS; + +class Http2AllocationStrategyTest { + private Http2AllocationStrategy.Builder builder; + + @BeforeEach + void setUp() { + builder = Http2AllocationStrategy.builder(); + } + + @Test + void build() { + builder.maxConcurrentStreams(2).maxConnections(2).minConnections(1); + Http2AllocationStrategy strategy = builder.build(); + assertThat(strategy.maxConcurrentStreams()).isEqualTo(2); + assertThat(strategy.permitMaximum()).isEqualTo(2); + assertThat(strategy.permitMinimum()).isEqualTo(1); + } + + @Test + void buildBadValues() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> builder.maxConnections(1).minConnections(2).build()) + .withMessage("minConnections (2) must be less than or equal to maxConnections (1)"); + } + + @Test + void copy() { + builder.maxConcurrentStreams(2).maxConnections(2).minConnections(1); + Http2AllocationStrategy strategy = builder.build(); + Http2AllocationStrategy copy = strategy.copy(); + assertThat(copy.maxConcurrentStreams()).isEqualTo(strategy.maxConcurrentStreams()); + assertThat(copy.permitMaximum()).isEqualTo(strategy.permitMaximum()); + assertThat(copy.permitMinimum()).isEqualTo(strategy.permitMinimum()); + } + + @Test + void maxConcurrentStreams() { + builder.maxConcurrentStreams(2); + Http2AllocationStrategy strategy = builder.build(); + assertThat(strategy.maxConcurrentStreams()).isEqualTo(2); + assertThat(strategy.permitMaximum()).isEqualTo(DEFAULT_MAX_CONNECTIONS); + assertThat(strategy.permitMinimum()).isEqualTo(DEFAULT_MIN_CONNECTIONS); + } + + @Test + void maxConcurrentStreamsBadValues() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> builder.maxConcurrentStreams(-2)) + .withMessage("maxConcurrentStreams must be greater than or equal to -1"); + } + + @Test + void permitMaximum() { + builder.maxConnections(2); + Http2AllocationStrategy strategy = builder.build(); + assertThat(strategy.maxConcurrentStreams()).isEqualTo(DEFAULT_MAX_CONCURRENT_STREAMS); + assertThat(strategy.permitMaximum()).isEqualTo(2); + assertThat(strategy.permitMinimum()).isEqualTo(DEFAULT_MIN_CONNECTIONS); + } + + @Test + void permitMaximumBadValues() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> builder.maxConnections(0)) + .withMessage("maxConnections must be strictly positive"); + } + + @Test + void permitMinimum() { + builder.minConnections(2); + Http2AllocationStrategy strategy = builder.build(); + assertThat(strategy.maxConcurrentStreams()).isEqualTo(DEFAULT_MAX_CONCURRENT_STREAMS); + assertThat(strategy.permitMaximum()).isEqualTo(DEFAULT_MAX_CONNECTIONS); + assertThat(strategy.permitMinimum()).isEqualTo(2); + } + + @Test + void permitMinimumBadValues() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> builder.minConnections(-1)) + .withMessage("minConnections must be positive or zero"); + } +} diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index dc24581f27..6780f795c2 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -22,9 +22,9 @@ import io.netty.handler.codec.http2.Http2FrameCodecBuilder; import io.netty.handler.codec.http2.Http2MultiplexHandler; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.Connection; -import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException; import reactor.netty.internal.shaded.reactor.pool.PoolBuilder; import reactor.netty.internal.shaded.reactor.pool.PoolConfig; @@ -38,9 +38,9 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; class Http2PoolTest { @@ -53,7 +53,7 @@ void acquireInvalidate() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); try { List> acquired = new ArrayList<>(); @@ -61,21 +61,26 @@ void acquireInvalidate() { http2Pool.acquire().subscribe(acquired::add); http2Pool.acquire().subscribe(acquired::add); + channel.runPendingTasks(); + assertThat(acquired).hasSize(3); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(3); + assertThat(http2Pool.activeStreams()).isEqualTo(3); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); for (PooledRef slot : acquired) { slot.invalidate().block(Duration.ofSeconds(1)); } - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); for (PooledRef slot : acquired) { // second invalidate() should be ignored and ACQUIRED size should remain the same slot.invalidate().block(Duration.ofSeconds(1)); } - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); } finally { channel.finishAndReleaseAll(); @@ -92,7 +97,7 @@ void acquireRelease() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); try { List> acquired = new ArrayList<>(); @@ -100,21 +105,26 @@ void acquireRelease() { http2Pool.acquire().subscribe(acquired::add); http2Pool.acquire().subscribe(acquired::add); + channel.runPendingTasks(); + assertThat(acquired).hasSize(3); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(3); + assertThat(http2Pool.activeStreams()).isEqualTo(3); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); for (PooledRef slot : acquired) { slot.release().block(Duration.ofSeconds(1)); } - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); for (PooledRef slot : acquired) { // second release() should be ignored and ACQUIRED size should remain the same slot.release().block(Duration.ofSeconds(1)); } - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); } finally { channel.finishAndReleaseAll(); @@ -134,15 +144,16 @@ void evictClosedConnection() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); Connection connection = null; try { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection = acquired1.poolable(); ChannelId id1 = connection.channel().id(); @@ -153,19 +164,22 @@ void evictClosedConnection() throws Exception { assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); acquired1.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection = acquired2.poolable(); ChannelId id2 = connection.channel().id(); @@ -174,8 +188,9 @@ void evictClosedConnection() throws Exception { acquired2.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); - assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection != null) { @@ -186,26 +201,37 @@ void evictClosedConnection() throws Exception { } @Test - void evictClosedConnectionMaxConnectionsNotReached() throws Exception { + void evictClosedConnectionMaxConnectionsNotReached_1() throws Exception { + evictClosedConnectionMaxConnectionsNotReached(false); + } + + @Test + void evictClosedConnectionMaxConnectionsNotReached_2() throws Exception { + evictClosedConnectionMaxConnectionsNotReached(true); + } + + private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) throws Exception { PoolBuilder> poolBuilder = PoolBuilder.from(Mono.fromSupplier(() -> { Channel channel = new EmbeddedChannel( new TestChannelId(), - Http2FrameCodecBuilder.forClient().build()); + Http2FrameCodecBuilder.forClient().build(), + new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); return Connection.from(channel); })) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); Connection connection = null; try { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); connection = acquired1.poolable(); ChannelId id1 = connection.channel().id(); @@ -216,25 +242,53 @@ void evictClosedConnectionMaxConnectionsNotReached() throws Exception { assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); PooledRef acquired2 = http2Pool.acquire().block(); - assertThat(acquired2).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(2); - assertThat(http2Pool.connections.size()).isEqualTo(2); + + AtomicReference> acquired3 = new AtomicReference<>(); + http2Pool.acquire().subscribe(acquired3::set); connection = acquired2.poolable(); - ChannelId id2 = connection.channel().id(); + ((EmbeddedChannel) connection.channel()).runPendingTasks(); + assertThat(http2Pool.activeStreams()).isEqualTo(3); + assertThat(http2Pool.connections.size()).isEqualTo(2); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(2L * Integer.MAX_VALUE); + + if (closeSecond) { + latch = new CountDownLatch(1); + ((EmbeddedChannel) connection.channel()).finishAndReleaseAll(); + connection.onDispose(latch::countDown); + connection.dispose(); + + assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue(); + } + + ChannelId id2 = connection.channel().id(); assertThat(id1).isNotEqualTo(id2); acquired1.invalidate().block(); acquired2.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); - assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); + + acquired3.get().invalidate().block(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + if (closeSecond) { + assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + } + else { + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); + } } finally { if (connection != null) { @@ -256,15 +310,16 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); Connection connection = null; try { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection = acquired1.poolable(); CountDownLatch latch = new CountDownLatch(1); @@ -274,21 +329,24 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); http2Pool.acquire(Duration.ofMillis(10)) .as(StepVerifier::create) .expectError(PoolAcquireTimeoutException.class) .verify(Duration.ofSeconds(1)); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); acquired1.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection != null) { @@ -298,6 +356,347 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { } } + @Test + void evictInBackgroundClosedConnection() throws Exception { + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.fromSupplier(() -> { + Channel channel = new EmbeddedChannel( + new TestChannelId(), + Http2FrameCodecBuilder.forClient().build()); + return Connection.from(channel); + })) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(0, 1) + .evictInBackground(Duration.ofSeconds(5)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); + + Connection connection = null; + try { + PooledRef acquired1 = http2Pool.acquire().block(); + + assertThat(acquired1).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + connection = acquired1.poolable(); + ChannelId id1 = connection.channel().id(); + CountDownLatch latch = new CountDownLatch(1); + ((EmbeddedChannel) connection.channel()).finishAndReleaseAll(); + connection.onDispose(latch::countDown); + connection.dispose(); + + assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue(); + + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + acquired1.invalidate().block(); + + http2Pool.evictInBackground(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + PooledRef acquired2 = http2Pool.acquire().block(); + + assertThat(acquired2).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + connection = acquired2.poolable(); + ChannelId id2 = connection.channel().id(); + + assertThat(id1).isNotEqualTo(id2); + + acquired2.invalidate().block(); + + http2Pool.evictInBackground(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + } + finally { + if (connection != null) { + ((EmbeddedChannel) connection.channel()).finishAndReleaseAll(); + connection.dispose(); + } + } + } + + @Test + void evictInBackgroundMaxIdleTime() throws Exception { + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.fromSupplier(() -> { + Channel channel = new EmbeddedChannel( + new TestChannelId(), + Http2FrameCodecBuilder.forClient().build()); + return Connection.from(channel); + })) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(0, 1) + .evictInBackground(Duration.ofSeconds(5)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, 10, -1)); + + Connection connection1 = null; + Connection connection2 = null; + try { + PooledRef acquired1 = http2Pool.acquire().block(); + + assertThat(acquired1).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + connection1 = acquired1.poolable(); + ChannelId id1 = connection1.channel().id(); + + acquired1.invalidate().block(); + + Thread.sleep(15); + + http2Pool.evictInBackground(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + PooledRef acquired2 = http2Pool.acquire().block(); + + assertThat(acquired2).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + connection2 = acquired2.poolable(); + ChannelId id2 = connection2.channel().id(); + + assertThat(id1).isNotEqualTo(id2); + + acquired2.invalidate().block(); + + Thread.sleep(15); + + http2Pool.evictInBackground(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + } + finally { + if (connection1 != null) { + ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll(); + connection1.dispose(); + } + if (connection2 != null) { + ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll(); + connection2.dispose(); + } + } + } + + @Test + void evictInBackgroundMaxLifeTime() throws Exception { + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.fromSupplier(() -> { + Channel channel = new EmbeddedChannel( + new TestChannelId(), + Http2FrameCodecBuilder.forClient().build()); + return Connection.from(channel); + })) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(0, 1) + .evictInBackground(Duration.ofSeconds(5)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10)); + + Connection connection1 = null; + Connection connection2 = null; + try { + PooledRef acquired1 = http2Pool.acquire().block(); + + assertThat(acquired1).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + connection1 = acquired1.poolable(); + ChannelId id1 = connection1.channel().id(); + + Thread.sleep(10); + + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + acquired1.invalidate().block(); + + http2Pool.evictInBackground(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + PooledRef acquired2 = http2Pool.acquire().block(); + + assertThat(acquired2).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + connection2 = acquired2.poolable(); + ChannelId id2 = connection2.channel().id(); + + assertThat(id1).isNotEqualTo(id2); + + acquired2.invalidate().block(); + + Thread.sleep(10); + + http2Pool.evictInBackground(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + } + finally { + if (connection1 != null) { + ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll(); + connection1.dispose(); + } + if (connection2 != null) { + ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll(); + connection2.dispose(); + } + } + } + + @Test + void maxIdleTime() throws Exception { + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.fromSupplier(() -> { + Channel channel = new EmbeddedChannel( + new TestChannelId(), + Http2FrameCodecBuilder.forClient().build()); + return Connection.from(channel); + })) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(0, 1); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, 10, -1)); + + Connection connection1 = null; + Connection connection2 = null; + try { + PooledRef acquired1 = http2Pool.acquire().block(); + + assertThat(acquired1).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + connection1 = acquired1.poolable(); + ChannelId id1 = connection1.channel().id(); + + acquired1.invalidate().block(); + + Thread.sleep(15); + + PooledRef acquired2 = http2Pool.acquire().block(); + + assertThat(acquired2).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + connection2 = acquired2.poolable(); + ChannelId id2 = connection2.channel().id(); + + assertThat(id1).isNotEqualTo(id2); + + acquired2.invalidate().block(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + } + finally { + if (connection1 != null) { + ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll(); + connection1.dispose(); + } + if (connection2 != null) { + ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll(); + connection2.dispose(); + } + } + } + + @Test + void maxIdleTimeActiveStreams() throws Exception { + EmbeddedChannel channel = new EmbeddedChannel(new TestChannelId(), + Http2FrameCodecBuilder.forClient().build(), new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.just(Connection.from(channel))) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(0, 1); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, 10, -1)); + + Connection connection1 = null; + Connection connection2 = null; + try { + List> acquired = new ArrayList<>(); + http2Pool.acquire().subscribe(acquired::add); + http2Pool.acquire().subscribe(acquired::add); + + channel.runPendingTasks(); + + assertThat(acquired).hasSize(2); + assertThat(http2Pool.activeStreams()).isEqualTo(2); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); + + connection1 = acquired.get(0).poolable(); + ChannelId id1 = connection1.channel().id(); + + acquired.get(0).invalidate().block(); + + Thread.sleep(15); + + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); + + connection2 = acquired.get(1).poolable(); + ChannelId id2 = connection2.channel().id(); + + assertThat(id1).isEqualTo(id2); + + acquired.get(1).invalidate().block(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); + } + finally { + if (connection1 != null) { + ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll(); + connection1.dispose(); + } + if (connection2 != null) { + ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll(); + connection2.dispose(); + } + } + } + @Test void maxLifeTime() throws Exception { PoolBuilder> poolBuilder = @@ -310,7 +709,7 @@ void maxLifeTime() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10)); Connection connection1 = null; Connection connection2 = null; @@ -318,27 +717,31 @@ void maxLifeTime() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection1 = acquired1.poolable(); ChannelId id1 = connection1.channel().id(); Thread.sleep(10); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); acquired1.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection2 = acquired2.poolable(); ChannelId id2 = connection2.channel().id(); @@ -347,8 +750,9 @@ void maxLifeTime() throws Exception { acquired2.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); - assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection1 != null) { @@ -374,7 +778,7 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 50)); Connection connection1 = null; Connection connection2 = null; @@ -382,22 +786,25 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection1 = acquired1.poolable(); ChannelId id1 = connection1.channel().id(); - Thread.sleep(10); + Thread.sleep(50); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(2); + assertThat(http2Pool.activeStreams()).isEqualTo(2); assertThat(http2Pool.connections.size()).isEqualTo(2); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection2 = acquired2.poolable(); ChannelId id2 = connection2.channel().id(); @@ -407,8 +814,9 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { acquired1.invalidate().block(); acquired2.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); - assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection1 != null) { @@ -434,35 +842,39 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10)); Connection connection = null; try { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection = acquired1.poolable(); Thread.sleep(10); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); http2Pool.acquire(Duration.ofMillis(10)) .as(StepVerifier::create) .expectError(PoolAcquireTimeoutException.class) .verify(Duration.ofSeconds(1)); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); acquired1.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection != null) { @@ -473,11 +885,101 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { } @Test - void minConnectionsConfigNotSupported() { + void minConnections() { + EmbeddedChannel channel = new EmbeddedChannel(new TestChannelId(), + Http2FrameCodecBuilder.forClient().build(), new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.just(Connection.from(channel))) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(1, 3); + Http2AllocationStrategy strategy = Http2AllocationStrategy.builder() + .maxConnections(3) + .minConnections(1) + .build(); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, strategy, -1, -1)); + + List> acquired = new ArrayList<>(); + try { + Flux.range(0, 3) + .flatMap(i -> http2Pool.acquire().doOnNext(acquired::add)) + .subscribe(); + + channel.runPendingTasks(); + + assertThat(acquired).hasSize(3); + + assertThat(http2Pool.activeStreams()).isEqualTo(3); + assertThat(acquired.get(0).poolable()).isSameAs(acquired.get(1).poolable()); + assertThat(acquired.get(0).poolable()).isSameAs(acquired.get(2).poolable()); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); + + for (PooledRef slot : acquired) { + slot.release().block(Duration.ofSeconds(1)); + } + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); + } + finally { + for (PooledRef slot : acquired) { + Connection conn = slot.poolable(); + ((EmbeddedChannel) conn.channel()).finishAndReleaseAll(); + conn.dispose(); + } + } + } + + @Test + void minConnectionsMaxStreamsReached() { PoolBuilder> poolBuilder = - PoolBuilder.from(Mono.empty()).sizeBetween(1, 2); - assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> poolBuilder.build(config -> new Http2Pool(config, -1))); + PoolBuilder.from(Mono.fromSupplier(() -> { + Channel channel = new EmbeddedChannel( + new TestChannelId(), + Http2FrameCodecBuilder.forClient().build()); + return Connection.from(channel); + })) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(1, 3); + Http2AllocationStrategy strategy = Http2AllocationStrategy.builder() + .maxConnections(3) + .minConnections(1) + .build(); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, strategy, -1, -1)); + + List> acquired = new ArrayList<>(); + try { + Flux.range(0, 3) + .flatMap(i -> http2Pool.acquire().doOnNext(acquired::add)) + .blockLast(Duration.ofSeconds(1)); + + assertThat(acquired).hasSize(3); + + for (PooledRef pooledRef : acquired) { + ((EmbeddedChannel) pooledRef.poolable().channel()).runPendingTasks(); + } + + assertThat(http2Pool.activeStreams()).isEqualTo(3); + assertThat(acquired.get(0).poolable()).isNotSameAs(acquired.get(1).poolable()); + assertThat(acquired.get(0).poolable()).isNotSameAs(acquired.get(2).poolable()); + assertThat(acquired.get(1).poolable()).isNotSameAs(acquired.get(2).poolable()); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + for (PooledRef slot : acquired) { + slot.release().block(Duration.ofSeconds(1)); + } + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + } + finally { + for (PooledRef slot : acquired) { + Connection conn = slot.poolable(); + ((EmbeddedChannel) conn.channel()).finishAndReleaseAll(); + conn.dispose(); + } + } } @Test @@ -488,24 +990,27 @@ void nonHttp2ConnectionEmittedOnce() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); try { PooledRef acquired = http2Pool.acquire().block(Duration.ofSeconds(1)); assertThat(acquired).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); http2Pool.acquire(Duration.ofMillis(10)) .as(StepVerifier::create) .expectError(PoolAcquireTimeoutException.class) .verify(Duration.ofSeconds(1)); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); acquired.invalidate().block(Duration.ofSeconds(1)); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { channel.finishAndReleaseAll(); diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java index 6ee8a43053..0cad951be8 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java @@ -15,6 +15,10 @@ */ package reactor.netty.resources; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; @@ -28,7 +32,9 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.SelfSignedCertificate; import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -43,6 +49,7 @@ import reactor.netty.http.Http11SslContextSpec; import reactor.netty.http.Http2SslContextSpec; import reactor.netty.http.HttpProtocol; +import reactor.netty.http.client.Http2AllocationStrategy; import reactor.netty.http.client.HttpClient; import reactor.netty.http.server.HttpServer; import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; @@ -62,18 +69,41 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; +import static reactor.netty.Metrics.ACTIVE_CONNECTIONS; +import static reactor.netty.Metrics.CONNECTION_PROVIDER_PREFIX; +import static reactor.netty.Metrics.IDLE_CONNECTIONS; +import static reactor.netty.Metrics.NAME; +import static reactor.netty.Metrics.REMOTE_ADDRESS; +import static reactor.netty.Metrics.TOTAL_CONNECTIONS; +import static reactor.netty.http.client.HttpClientState.STREAM_CONFIGURED; class DefaultPooledConnectionProviderTest extends BaseHttpTest { static SelfSignedCertificate ssc; + private MeterRegistry registry; + @BeforeAll static void createSelfSignedCertificate() throws CertificateException { ssc = new SelfSignedCertificate(); } + @BeforeEach + void setUp() { + registry = new SimpleMeterRegistry(); + Metrics.addRegistry(registry); + } + + @AfterEach + void tearDown() { + Metrics.removeRegistry(registry); + registry.clear(); + registry.close(); + } + @Test void testIssue903() { Http11SslContextSpec serverCtx = Http11SslContextSpec.forServer(ssc.key(), ssc.cert()); @@ -290,7 +320,7 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, } @Test - void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception { + void testConnectionIdleWhenNoActiveStreams() throws Exception { Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()); Http2SslContextSpec clientCtx = Http2SslContextSpec.forClient() @@ -307,19 +337,31 @@ void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception { int requestsNum = 10; CountDownLatch latch = new CountDownLatch(1); DefaultPooledConnectionProvider provider = - (DefaultPooledConnectionProvider) ConnectionProvider.create("testConnectionReturnedToParentPoolWhenNoActiveStreams", 5); + (DefaultPooledConnectionProvider) ConnectionProvider.create("testConnectionIdleWhenNoActiveStreams", 5); AtomicInteger counter = new AtomicInteger(); + AtomicReference serverAddress = new AtomicReference<>(); HttpClient client = createClient(provider, disposableServer.port()) - .wiretap(false) + .wiretap(false) .protocol(HttpProtocol.H2) .secure(spec -> spec.sslContext(clientCtx)) + .metrics(true, Function.identity()) + .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())) .observe((conn, state) -> { - if (state == ConnectionObserver.State.CONNECTED) { + if (state == STREAM_CONFIGURED) { counter.incrementAndGet(); - } - if (state == ConnectionObserver.State.RELEASED && counter.decrementAndGet() == 0) { - latch.countDown(); + conn.onTerminate() + .subscribe(null, + t -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + }), + () -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + })); } }); @@ -328,7 +370,7 @@ void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception { .flatMap(i -> client.post() .uri("/") - .send(ByteBufMono.fromString(Mono.just("testConnectionReturnedToParentPoolWhenNoActiveStreams"))) + .send(ByteBufMono.fromString(Mono.just("testConnectionIdleWhenNoActiveStreams"))) .responseContent() .aggregate() .asString()) @@ -336,14 +378,16 @@ void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception { assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); - assertThat(provider.channelPools).hasSize(1); + InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); + String address = sa.getHostString() + ":" + sa.getPort(); - @SuppressWarnings({"unchecked", "rawtypes"}) - InstrumentedPool channelPool = - provider.channelPools.values().toArray(new InstrumentedPool[0])[0]; - InstrumentedPool.PoolMetrics metrics = channelPool.metrics(); - assertThat(metrics.acquiredSize()).isEqualTo(0); - assertThat(metrics.allocatedSize()).isEqualTo(metrics.idleSize()); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.testConnectionIdleWhenNoActiveStreams")).isEqualTo(0); + double idleConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.testConnectionIdleWhenNoActiveStreams"); + double totalConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "testConnectionIdleWhenNoActiveStreams"); + assertThat(totalConn).isEqualTo(idleConn); } finally { provider.disposeLater() @@ -442,21 +486,33 @@ private void doTestIssue1982(HttpProtocol[] serverProtocols, HttpProtocol[] clie .bindNow(); DefaultPooledConnectionProvider provider = - (DefaultPooledConnectionProvider) ConnectionProvider.create("", 5); + (DefaultPooledConnectionProvider) ConnectionProvider.create("doTestIssue1982", 5); CountDownLatch latch = new CountDownLatch(1); AtomicInteger counter = new AtomicInteger(); + AtomicReference serverAddress = new AtomicReference<>(); HttpClient mainClient = clientCtx != null ? HttpClient.create(provider).port(disposableServer.port()).secure(sslContextSpec -> sslContextSpec.sslContext(clientCtx)) : HttpClient.create(provider).port(disposableServer.port()); HttpClient client = mainClient.protocol(clientProtocols) + .metrics(true, Function.identity()) + .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())) .observe((conn, state) -> { - if (state == ConnectionObserver.State.CONNECTED) { + if (state == STREAM_CONFIGURED) { counter.incrementAndGet(); - } - if (state == ConnectionObserver.State.RELEASED && counter.decrementAndGet() == 0) { - latch.countDown(); + conn.onTerminate() + .subscribe(null, + t -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + }), + () -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + })); } }); try { @@ -471,12 +527,96 @@ private void doTestIssue1982(HttpProtocol[] serverProtocols, HttpProtocol[] clie assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); - @SuppressWarnings({"unchecked", "rawtypes"}) - InstrumentedPool channelPool = - provider.channelPools.values().toArray(new InstrumentedPool[0])[0]; - InstrumentedPool.PoolMetrics metrics = channelPool.metrics(); - assertThat(metrics.acquiredSize()).isEqualTo(0); - assertThat(metrics.allocatedSize()).isEqualTo(metrics.idleSize()); + InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); + String address = sa.getHostString() + ":" + sa.getPort(); + + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.doTestIssue1982")).isEqualTo(0); + double idleConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.doTestIssue1982"); + double totalConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "doTestIssue1982"); + assertThat(totalConn).isEqualTo(idleConn); + } + finally { + provider.disposeLater() + .block(Duration.ofSeconds(5)); + } + } + + //https://github.com/reactor/reactor-netty/issues/1808 + @Test + void testMinConnections() throws Exception { + Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()); + Http2SslContextSpec clientCtx = + Http2SslContextSpec.forClient() + .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE)); + + disposableServer = + createServer() + .wiretap(false) + .protocol(HttpProtocol.H2) + .secure(spec -> spec.sslContext(serverCtx)) + .route(routes -> routes.post("/", (req, res) -> res.send(req.receive().retain()))) + .bindNow(); + + int requestsNum = 100; + CountDownLatch latch = new CountDownLatch(1); + DefaultPooledConnectionProvider provider = + (DefaultPooledConnectionProvider) ConnectionProvider.builder("testMinConnections") + .allocationStrategy(Http2AllocationStrategy.builder().maxConnections(20).minConnections(5).build()) + .build(); + AtomicInteger counter = new AtomicInteger(); + AtomicReference serverAddress = new AtomicReference<>(); + HttpClient client = + createClient(provider, disposableServer.port()) + .wiretap(false) + .protocol(HttpProtocol.H2) + .secure(spec -> spec.sslContext(clientCtx)) + .metrics(true, Function.identity()) + .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())) + .observe((conn, state) -> { + if (state == STREAM_CONFIGURED) { + counter.incrementAndGet(); + conn.onTerminate() + .subscribe(null, + t -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + }), + () -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + })); + } + }); + + try { + Flux.range(0, requestsNum) + .flatMap(i -> + client.post() + .uri("/") + .send(ByteBufMono.fromString(Mono.just("testMinConnections"))) + .responseContent() + .aggregate() + .asString()) + .blockLast(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + + InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); + String address = sa.getHostString() + ":" + sa.getPort(); + + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.testMinConnections")).isEqualTo(0); + double idleConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.testMinConnections"); + double totalConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "testMinConnections"); + assertThat(totalConn).isEqualTo(idleConn); + assertThat(totalConn).isLessThan(10); } finally { provider.disposeLater() @@ -512,4 +652,13 @@ public boolean trySuccess(Void result) { return r; } } + + private double getGaugeValue(String gaugeName, String... tags) { + Gauge gauge = registry.find(gaugeName).tags(tags).gauge(); + double result = -1; + if (gauge != null) { + result = gauge.value(); + } + return result; + } } diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java index ac752e082f..a466ff3982 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java @@ -207,15 +207,17 @@ private void doTest(HttpServer server, HttpClient client, String poolName, boole assertThat(metrics.get()).isTrue(); if (isSecured) { assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, poolName)).isEqualTo(1); - assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, poolName)).isEqualTo(1); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, poolName)).isEqualTo(1); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, "http2." + poolName)).isEqualTo(0); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, "http2." + poolName)).isEqualTo(1); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_STREAMS, "http2." + poolName)).isEqualTo(0); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, "http2." + poolName)).isEqualTo(0); } else { assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, poolName)).isEqualTo(0); - assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, poolName)).isEqualTo(0); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, poolName)).isEqualTo(0); } - assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, poolName)).isEqualTo(0); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, poolName)).isEqualTo(0); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + PENDING_CONNECTIONS, poolName)).isEqualTo(0); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + MAX_CONNECTIONS, poolName)).isEqualTo(expectedMaxConnection); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + MAX_PENDING_CONNECTIONS, poolName)).isEqualTo(expectedMaxPendingAcquire);