From ce97087b6abf0d9e05e8afc4d4b2f2cd8bfc5a71 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 30 May 2022 12:22:34 +0300 Subject: [PATCH 01/11] Http2Pool handles the lifecycle of the cache with connections Cache Http2FrameCodec/Http2MultiplexHandler context --- .../resources/PooledConnectionProvider.java | 10 +- .../reactor/netty/http/HttpResources.java | 10 + .../http/client/Http2ConnectionProvider.java | 15 +- .../reactor/netty/http/client/Http2Pool.java | 218 ++++++++++++------ ...Http2ConnectionProviderMeterRegistrar.java | 17 +- .../netty/http/client/Http2PoolTest.java | 138 +++++++---- .../DefaultPooledConnectionProviderTest.java | 120 +++++++--- ...dConnectionProviderDefaultMetricsTest.java | 8 +- 8 files changed, 377 insertions(+), 159 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java index a19816e512..25947313e2 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java @@ -89,6 +89,7 @@ public abstract class PooledConnectionProvider implements final Duration poolInactivity; final Duration disposeTimeout; final Map maxConnections = new HashMap<>(); + Mono onDispose; protected PooledConnectionProvider(Builder builder) { this(builder, null); @@ -106,6 +107,7 @@ protected PooledConnectionProvider(Builder builder) { poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout)); maxConnections.put(entry.getKey(), entry.getValue().maxConnections); } + this.onDispose = Mono.empty(); scheduleInactivePoolsDisposal(); } @@ -190,10 +192,10 @@ public final Mono disposeLater() { }) .collect(Collectors.toList()); if (pools.isEmpty()) { - return Mono.empty(); + return onDispose; } channelPools.clear(); - return Mono.when(pools); + return onDispose.and(Mono.when(pools)); }); } @@ -243,6 +245,10 @@ public String name() { return name; } + public void onDispose(Mono disposeMono) { + onDispose = onDispose.and(disposeMono); + } + protected abstract CoreSubscriber> createDisposableAcquire( TransportConfig config, ConnectionObserver connectionObserver, diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java b/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java index 801dcac824..a94268145b 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java @@ -15,6 +15,7 @@ */ package reactor.netty.http; +import java.net.SocketAddress; import java.time.Duration; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -147,6 +148,15 @@ public static HttpResources set(LoopResources loops) { http2ConnectionProvider = new AtomicReference<>(); } + @Override + public void disposeWhen(SocketAddress remoteAddress) { + ConnectionProvider provider = http2ConnectionProvider.get(); + if (provider != null) { + provider.disposeWhen(remoteAddress); + } + super.disposeWhen(remoteAddress); + } + @Override public AddressResolverGroup getOrCreateDefaultResolver() { return super.getOrCreateDefaultResolver(); diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index cf76c73902..59ae2224e7 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -16,6 +16,7 @@ package reactor.netty.http.client; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2FrameCodec; @@ -76,6 +77,9 @@ final class Http2ConnectionProvider extends PooledConnectionProvider Http2ConnectionProvider(ConnectionProvider parent) { super(initConfiguration(parent)); this.parent = parent; + if (parent instanceof PooledConnectionProvider) { + ((PooledConnectionProvider) parent).onDispose(disposeLater()); + } } static Builder initConfiguration(ConnectionProvider parent) { @@ -332,11 +336,12 @@ public void onUncaughtException(Connection connection, Throwable error) { @Override public void operationComplete(Future future) { Channel channel = pooledRef.poolable().channel(); - Http2FrameCodec frameCodec = channel.pipeline().get(Http2FrameCodec.class); + ChannelHandlerContext frameCodec = ((Http2Pool.Http2PooledRef) pooledRef).slot.http2FrameCodecCtx(); if (future.isSuccess()) { Http2StreamChannel ch = future.getNow(); - if (!channel.isActive() || frameCodec == null || !frameCodec.connection().local().canOpenStream()) { + if (!channel.isActive() || frameCodec == null || + !((Http2FrameCodec) frameCodec.handler()).connection().local().canOpenStream()) { invalidate(this); if (!retried) { if (log.isDebugEnabled()) { @@ -358,8 +363,8 @@ public void operationComplete(Future future) { sink.success(ops); } - Http2Connection.Endpoint localEndpoint = frameCodec.connection().local(); if (log.isDebugEnabled()) { + Http2Connection.Endpoint localEndpoint = ((Http2FrameCodec) frameCodec.handler()).connection().local(); logStreamsState(ch, localEndpoint, "Stream opened"); } } @@ -373,7 +378,7 @@ public void operationComplete(Future future) { boolean isH2cUpgrade() { Channel channel = pooledRef.poolable().channel(); if (channel.pipeline().get(NettyPipeline.H2CUpgradeHandler) != null && - channel.pipeline().get(NettyPipeline.H2MultiplexHandler) == null) { + ((Http2Pool.Http2PooledRef) pooledRef).slot.http2MultiplexHandlerCtx() == null) { ChannelOperations ops = ChannelOperations.get(channel); if (ops != null) { sink.success(ops); @@ -408,7 +413,7 @@ else if (!ApplicationProtocolNames.HTTP_2.equals(handler.applicationProtocol())) } } else if (pipeline.get(NettyPipeline.H2CUpgradeHandler) == null && - pipeline.get(NettyPipeline.H2MultiplexHandler) == null) { + ((Http2Pool.Http2PooledRef) pooledRef).slot.http2MultiplexHandlerCtx() == null) { // It is not H2. There are no handlers for H2C upgrade/H2C prior-knowledge, // continue as an HTTP/1.1 request and remove the connection from this pool. ChannelOperations ops = ChannelOperations.get(channel); diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index a86dd87f3b..864b32e430 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -24,9 +24,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Function; +import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.codec.http2.Http2MultiplexHandler; +import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Disposable; @@ -36,8 +39,7 @@ import reactor.core.publisher.Operators; import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; -import reactor.netty.ConnectionObserver; -import reactor.netty.channel.ChannelOperations; +import reactor.netty.FutureMono; import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; import reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException; import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException; @@ -59,7 +61,6 @@ *
    *
  • The connection is closed.
  • *
  • The connection has reached its life time and there are no active streams.
  • - *
  • The connection has no active streams.
  • *
  • When the client is in one of the two modes: 1) H2 and HTTP/1.1 or 2) H2C and HTTP/1.1, * and the negotiated protocol is HTTP/1.1.
  • *
@@ -75,9 +76,9 @@ *

* This pool always invalidate the {@link PooledRef}, there is no release functionality. *

    - *
  • {@link PoolMetrics#acquiredSize()} and {@link PoolMetrics#allocatedSize()} always return the number of - * the active streams from all connections currently in the pool.
  • - *
  • {@link PoolMetrics#idleSize()} always returns {@code 0}.
  • + *
  • {@link PoolMetrics#acquiredSize()}, {@link PoolMetrics#allocatedSize()} and {@link PoolMetrics#idleSize()} + * always return the number of the cached connections.
  • + *
  • {@link Http2Pool#activeStreams()} always return the active streams from all connections currently in the pool.
  • *
*

* Configurations that are not applicable @@ -114,6 +115,10 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool. static final AtomicReferenceFieldUpdater CONNECTIONS = AtomicReferenceFieldUpdater.newUpdater(Http2Pool.class, ConcurrentLinkedQueue.class, "connections"); + volatile int idleSize; + private static final AtomicIntegerFieldUpdater IDLE_SIZE = + AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "idleSize"); + /** * Pending borrowers queue. Never invoke directly the poll/add/remove methods and instead of that, * use addPending/pollPending/removePending methods which take care of maintaining the pending queue size. @@ -171,12 +176,12 @@ public Mono> acquire(Duration timeout) { @Override public int acquiredSize() { - return acquired; + return allocatedSize() - idleSize(); } @Override public int allocatedSize() { - return acquired; + return poolConfig.allocationStrategy().permitGranted(); } @Override @@ -197,10 +202,19 @@ public Mono disposeLater() { p.fail(new PoolShutdownException()); } - // the last stream on that connection will release the connection to the parent pool - // the structure should not contain connections with 0 streams as the last stream on that connection - // always removes the connection from this pool - CONNECTIONS.getAndSet(this, null); + @SuppressWarnings("unchecked") + ConcurrentLinkedQueue slots = CONNECTIONS.getAndSet(this, null); + if (slots != null) { + Mono closeMonos = Mono.empty(); + while (!slots.isEmpty()) { + Slot slot = pollSlot(slots); + if (slot != null) { + slot.invalidate(); + closeMonos = closeMonos.and(DEFAULT_DESTROY_HANDLER.apply(slot.connection)); + } + } + return closeMonos; + } } return Mono.empty(); }); @@ -218,7 +232,7 @@ public int getMaxPendingAcquireSize() { @Override public int idleSize() { - return 0; + return idleSize; } @Override @@ -253,6 +267,10 @@ public Mono warmup() { return Mono.just(0); } + int activeStreams() { + return acquired; + } + void cancelAcquire(Borrower borrower) { if (!isDisposed()) { ConcurrentLinkedDeque q = pending; @@ -260,15 +278,32 @@ void cancelAcquire(Borrower borrower) { } } + @SuppressWarnings("FutureReturnValueIgnored") Mono destroyPoolable(Http2PooledRef ref) { + assert ref.slot.connection.channel().eventLoop().inEventLoop(); Mono mono = Mono.empty(); try { + // By default, check the connection for removal on acquire and invalidate (only if there are no active streams) if (ref.slot.decrementConcurrencyAndGet() == 0) { - ref.slot.invalidate(); - Connection connection = ref.poolable(); - Http2FrameCodec frameCodec = connection.channel().pipeline().get(Http2FrameCodec.class); - if (frameCodec != null) { - releaseConnection(connection); + // not HTTP/2 request + if (ref.slot.http2FrameCodecCtx() == null) { + ref.slot.invalidate(); + removeSlot(ref.slot); + } + // If there is eviction in background, the background process will remove this connection + else if (poolConfig.evictInBackgroundInterval().isZero()) { + // not active + if (!ref.poolable().channel().isActive()) { + ref.slot.invalidate(); + removeSlot(ref.slot); + } + // max life reached + else if (maxLifeReached(ref.slot)) { + //"FutureReturnValueIgnored" this is deliberate + ref.slot.connection.channel().close(); + ref.slot.invalidate(); + removeSlot(ref.slot); + } } } } @@ -315,27 +350,22 @@ void drainLoop() { if (slot != null) { Borrower borrower = pollPending(borrowers, true); if (borrower == null) { - resources.offer(slot); + offerSlot(resources, slot); continue; } if (isDisposed()) { borrower.fail(new PoolShutdownException()); return; } - if (slot.incrementConcurrencyAndGet() > 1) { - borrower.stopPendingCountdown(); - if (log.isDebugEnabled()) { - log.debug(format(slot.connection.channel(), "Channel activated")); - } - ACQUIRED.incrementAndGet(this); - // we are ready here, the connection can be used for opening another stream - slot.deactivate(); - poolConfig.acquisitionScheduler().schedule(() -> borrower.deliver(new Http2PooledRef(slot))); - } - else { - addPending(borrowers, borrower, true); - continue; + borrower.stopPendingCountdown(); + if (log.isDebugEnabled()) { + log.debug(format(slot.connection.channel(), "Channel activated")); } + ACQUIRED.incrementAndGet(this); + slot.connection.channel().eventLoop().execute(() -> { + borrower.deliver(new Http2PooledRef(slot)); + drain(); + }); } else { int permits = poolConfig.allocationStrategy().getPermits(1); @@ -372,8 +402,6 @@ void drainLoop() { log.debug(format(newInstance.channel(), "Channel activated")); } ACQUIRED.incrementAndGet(this); - newSlot.incrementConcurrencyAndGet(); - newSlot.deactivate(); borrower.deliver(new Http2PooledRef(newSlot)); } else if (sig.isOnError()) { @@ -398,15 +426,16 @@ else if (sig.isOnError()) { } @Nullable + @SuppressWarnings("FutureReturnValueIgnored") Slot findConnection(ConcurrentLinkedQueue resources) { - int resourcesCount = resources.size(); + int resourcesCount = idleSize; while (resourcesCount > 0) { // There are connections in the queue resourcesCount--; // get the connection - Slot slot = resources.poll(); + Slot slot = pollSlot(resources); if (slot == null) { continue; } @@ -418,38 +447,40 @@ Slot findConnection(ConcurrentLinkedQueue resources) { log.debug(format(slot.connection.channel(), "Channel is closed, {} active streams"), slot.concurrency()); } - resources.offer(slot); + offerSlot(resources, slot); } else { if (log.isDebugEnabled()) { log.debug(format(slot.connection.channel(), "Channel is closed, remove from pool")); } - resources.remove(slot); + slot.invalidate(); } continue; } // check that the connection's max lifetime has not been reached - if (maxLifeTime != -1 && slot.lifeTime() >= maxLifeTime) { + if (maxLifeReached(slot)) { if (slot.concurrency() > 0) { if (log.isDebugEnabled()) { log.debug(format(slot.connection.channel(), "Max life time is reached, {} active streams"), slot.concurrency()); } - resources.offer(slot); + offerSlot(resources, slot); } else { if (log.isDebugEnabled()) { log.debug(format(slot.connection.channel(), "Max life time is reached, remove from pool")); } - resources.remove(slot); + //"FutureReturnValueIgnored" this is deliberate + slot.connection.channel().close(); + slot.invalidate(); } continue; } // check that the connection's max active streams has not been reached if (!slot.canOpenStream()) { - resources.offer(slot); + offerSlot(resources, slot); if (log.isDebugEnabled()) { log.debug(format(slot.connection.channel(), "Max active streams is reached")); } @@ -462,6 +493,10 @@ Slot findConnection(ConcurrentLinkedQueue resources) { return null; } + boolean maxLifeReached(Slot slot) { + return maxLifeTime != -1 && slot.lifeTime() >= maxLifeTime; + } + void pendingAcquireLimitReached(Borrower borrower, int maxPending) { if (maxPending == 0) { borrower.fail(new PoolAcquirePendingLimitException(0, @@ -530,33 +565,40 @@ int addPending(ConcurrentLinkedDeque borrowers, Borrower borrower, boo return PENDING_SIZE.incrementAndGet(this); } - static boolean offerSlot(Slot slot) { - @SuppressWarnings("unchecked") - ConcurrentLinkedQueue q = CONNECTIONS.get(slot.pool); - return q != null && q.offer(slot); + void offerSlot(@Nullable ConcurrentLinkedQueue slots, Slot slot) { + if (slots != null && slots.offer(slot)) { + IDLE_SIZE.incrementAndGet(this); + } } - static void releaseConnection(Connection connection) { - ChannelOperations ops = connection.as(ChannelOperations.class); - if (ops != null) { - ops.listener().onStateChange(ops, ConnectionObserver.State.DISCONNECTING); - } - else if (connection instanceof ConnectionObserver) { - ((ConnectionObserver) connection).onStateChange(connection, ConnectionObserver.State.DISCONNECTING); + @Nullable + Slot pollSlot(@Nullable ConcurrentLinkedQueue slots) { + if (slots == null) { + return null; } - else { - connection.dispose(); + Slot slot = slots.poll(); + if (slot != null) { + IDLE_SIZE.decrementAndGet(this); } + return slot; } - static void removeSlot(Slot slot) { + void removeSlot(Slot slot) { @SuppressWarnings("unchecked") ConcurrentLinkedQueue q = CONNECTIONS.get(slot.pool); - if (q != null) { - q.remove(slot); + if (q != null && q.remove(slot)) { + IDLE_SIZE.decrementAndGet(this); } } + static final Function> DEFAULT_DESTROY_HANDLER = + connection -> { + if (!connection.channel().isActive()) { + return Mono.empty(); + } + return FutureMono.from(connection.channel().close()); + }; + static final class Borrower extends AtomicBoolean implements Scannable, Subscription, Runnable { static final Disposable TIMEOUT_DISPOSED = Disposables.disposed(); @@ -589,7 +631,10 @@ Context currentContext() { @Override public void request(long n) { if (Operators.validate(n)) { - if (!acquireTimeout.isZero()) { + // Cannot rely on idleSize because there might be idle connections but not suitable for use + int permits = pool.poolConfig.allocationStrategy().estimatePermitCount(); + int pending = pool.pendingSize; + if (!acquireTimeout.isZero() && permits <= pending) { timeoutTask = Schedulers.parallel().schedule(this, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS); } pool.doAcquire(this); @@ -627,7 +672,9 @@ public String toString() { } void deliver(Http2PooledRef poolSlot) { - stopPendingCountdown(); + assert poolSlot.slot.connection.channel().eventLoop().inEventLoop(); + poolSlot.slot.incrementConcurrencyAndGet(); + poolSlot.slot.deactivate(); if (get()) { //CANCELLED or timeout reached poolSlot.invalidate().subscribe(aVoid -> {}, e -> Operators.onErrorDropped(e, Context.empty())); @@ -737,7 +784,7 @@ public String toString() { } } - static final class Slot { + static final class Slot extends AtomicBoolean { volatile int concurrency; static final AtomicIntegerFieldUpdater CONCURRENCY = @@ -747,6 +794,9 @@ static final class Slot { final long creationTimestamp; final Http2Pool pool; + volatile ChannelHandlerContext http2FrameCodecCtx; + volatile ChannelHandlerContext http2MultiplexHandlerCtx; + Slot(Http2Pool pool, Connection connection) { this.connection = connection; this.creationTimestamp = pool.clock.millis(); @@ -754,10 +804,9 @@ static final class Slot { } boolean canOpenStream() { - Http2FrameCodec frameCodec = connection.channel().pipeline().get(Http2FrameCodec.class); - Http2MultiplexHandler multiplexHandler = connection.channel().pipeline().get(Http2MultiplexHandler.class); - if (frameCodec != null && multiplexHandler != null) { - int maxActiveStreams = frameCodec.connection().local().maxActiveStreams(); + ChannelHandlerContext frameCodec = http2FrameCodecCtx(); + if (frameCodec != null && http2MultiplexHandlerCtx() != null) { + int maxActiveStreams = ((Http2FrameCodec) frameCodec.handler()).connection().local().maxActiveStreams(); int concurrency = this.concurrency; return concurrency < maxActiveStreams; } @@ -772,23 +821,48 @@ void deactivate() { if (log.isDebugEnabled()) { log.debug(format(connection.channel(), "Channel deactivated")); } - offerSlot(this); + @SuppressWarnings("unchecked") + ConcurrentLinkedQueue slots = CONNECTIONS.get(pool); + pool.offerSlot(slots, this); } int decrementConcurrencyAndGet() { return CONCURRENCY.decrementAndGet(this); } - int incrementConcurrencyAndGet() { - return CONCURRENCY.incrementAndGet(this); + @Nullable + ChannelHandlerContext http2FrameCodecCtx() { + ChannelHandlerContext ctx = http2FrameCodecCtx; + if (ctx != null && !ctx.isRemoved()) { + return ctx; + } + ctx = connection.channel().pipeline().context(Http2FrameCodec.class); + http2FrameCodecCtx = ctx; + return ctx; + } + + @Nullable + ChannelHandlerContext http2MultiplexHandlerCtx() { + ChannelHandlerContext ctx = http2MultiplexHandlerCtx; + if (ctx != null && !ctx.isRemoved()) { + return ctx; + } + ctx = connection.channel().pipeline().context(Http2MultiplexHandler.class); + http2MultiplexHandlerCtx = ctx; + return ctx; + } + + void incrementConcurrencyAndGet() { + CONCURRENCY.incrementAndGet(this); } void invalidate() { - if (log.isDebugEnabled()) { - log.debug(format(connection.channel(), "Channel removed from pool")); + if (compareAndSet(false, true)) { + if (log.isDebugEnabled()) { + log.debug(format(connection.channel(), "Channel removed from pool")); + } + pool.poolConfig.allocationStrategy().returnPermits(1); } - pool.poolConfig.allocationStrategy().returnPermits(1); - removeSlot(this); } long lifeTime() { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java index 08bd8ee6af..3543fe1608 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java @@ -22,16 +22,21 @@ import java.net.SocketAddress; +import static reactor.netty.Metrics.ACTIVE_CONNECTIONS; import static reactor.netty.Metrics.ACTIVE_STREAMS; import static reactor.netty.Metrics.CONNECTION_PROVIDER_PREFIX; import static reactor.netty.Metrics.ID; +import static reactor.netty.Metrics.IDLE_CONNECTIONS; import static reactor.netty.Metrics.NAME; import static reactor.netty.Metrics.PENDING_STREAMS; import static reactor.netty.Metrics.REGISTRY; import static reactor.netty.Metrics.REMOTE_ADDRESS; final class MicrometerHttp2ConnectionProviderMeterRegistrar { + static final String ACTIVE_CONNECTIONS_DESCRIPTION = + "The number of the connections that have been successfully acquired and are in active use"; static final String ACTIVE_STREAMS_DESCRIPTION = "The number of the active HTTP/2 streams"; + static final String IDLE_CONNECTIONS_DESCRIPTION = "The number of the idle connections"; static final String PENDING_STREAMS_DESCRIPTION = "The number of requests that are waiting for opening HTTP/2 stream"; @@ -45,11 +50,21 @@ void registerMetrics(String poolName, String id, SocketAddress remoteAddress, In String addressAsString = Metrics.formatSocketAddress(remoteAddress); Tags tags = Tags.of(ID, id, REMOTE_ADDRESS, addressAsString, NAME, poolName); - Gauge.builder(CONNECTION_PROVIDER_PREFIX + ACTIVE_STREAMS, metrics, InstrumentedPool.PoolMetrics::acquiredSize) + Gauge.builder(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, metrics, InstrumentedPool.PoolMetrics::acquiredSize) + .description(ACTIVE_CONNECTIONS_DESCRIPTION) + .tags(tags) + .register(REGISTRY); + + Gauge.builder(CONNECTION_PROVIDER_PREFIX + ACTIVE_STREAMS, metrics, poolMetrics -> ((Http2Pool) poolMetrics).activeStreams()) .description(ACTIVE_STREAMS_DESCRIPTION) .tags(tags) .register(REGISTRY); + Gauge.builder(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, metrics, InstrumentedPool.PoolMetrics::idleSize) + .description(IDLE_CONNECTIONS_DESCRIPTION) + .tags(tags) + .register(REGISTRY); + Gauge.builder(CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, metrics, InstrumentedPool.PoolMetrics::pendingAcquireSize) .description(PENDING_STREAMS_DESCRIPTION) .tags(tags) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index dc24581f27..ec68ef3669 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -24,7 +24,6 @@ import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; import reactor.netty.Connection; -import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException; import reactor.netty.internal.shaded.reactor.pool.PoolBuilder; import reactor.netty.internal.shaded.reactor.pool.PoolConfig; @@ -38,6 +37,7 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -53,7 +53,7 @@ void acquireInvalidate() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); try { List> acquired = new ArrayList<>(); @@ -61,21 +61,23 @@ void acquireInvalidate() { http2Pool.acquire().subscribe(acquired::add); http2Pool.acquire().subscribe(acquired::add); + channel.runPendingTasks(); + assertThat(acquired).hasSize(3); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(3); + assertThat(http2Pool.activeStreams()).isEqualTo(3); for (PooledRef slot : acquired) { slot.invalidate().block(Duration.ofSeconds(1)); } - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); for (PooledRef slot : acquired) { // second invalidate() should be ignored and ACQUIRED size should remain the same slot.invalidate().block(Duration.ofSeconds(1)); } - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); } finally { channel.finishAndReleaseAll(); @@ -92,7 +94,7 @@ void acquireRelease() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); try { List> acquired = new ArrayList<>(); @@ -100,21 +102,23 @@ void acquireRelease() { http2Pool.acquire().subscribe(acquired::add); http2Pool.acquire().subscribe(acquired::add); + channel.runPendingTasks(); + assertThat(acquired).hasSize(3); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(3); + assertThat(http2Pool.activeStreams()).isEqualTo(3); for (PooledRef slot : acquired) { slot.release().block(Duration.ofSeconds(1)); } - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); for (PooledRef slot : acquired) { // second release() should be ignored and ACQUIRED size should remain the same slot.release().block(Duration.ofSeconds(1)); } - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); } finally { channel.finishAndReleaseAll(); @@ -141,7 +145,7 @@ void evictClosedConnection() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection = acquired1.poolable(); @@ -153,18 +157,18 @@ void evictClosedConnection() throws Exception { assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); acquired1.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection = acquired2.poolable(); @@ -174,8 +178,8 @@ void evictClosedConnection() throws Exception { acquired2.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); - assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); } finally { if (connection != null) { @@ -186,12 +190,22 @@ void evictClosedConnection() throws Exception { } @Test - void evictClosedConnectionMaxConnectionsNotReached() throws Exception { + void evictClosedConnectionMaxConnectionsNotReached_1() throws Exception { + evictClosedConnectionMaxConnectionsNotReached(false); + } + + @Test + void evictClosedConnectionMaxConnectionsNotReached_2() throws Exception { + evictClosedConnectionMaxConnectionsNotReached(true); + } + + private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) throws Exception { PoolBuilder> poolBuilder = PoolBuilder.from(Mono.fromSupplier(() -> { Channel channel = new EmbeddedChannel( new TestChannelId(), - Http2FrameCodecBuilder.forClient().build()); + Http2FrameCodecBuilder.forClient().build(), + new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); return Connection.from(channel); })) .idleResourceReuseLruOrder() @@ -204,7 +218,7 @@ void evictClosedConnectionMaxConnectionsNotReached() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection = acquired1.poolable(); @@ -216,25 +230,48 @@ void evictClosedConnectionMaxConnectionsNotReached() throws Exception { assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); PooledRef acquired2 = http2Pool.acquire().block(); - assertThat(acquired2).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(2); - assertThat(http2Pool.connections.size()).isEqualTo(2); + + AtomicReference> acquired3 = new AtomicReference<>(); + http2Pool.acquire().subscribe(acquired3::set); connection = acquired2.poolable(); - ChannelId id2 = connection.channel().id(); + ((EmbeddedChannel) connection.channel()).runPendingTasks(); + + assertThat(http2Pool.activeStreams()).isEqualTo(3); + assertThat(http2Pool.connections.size()).isEqualTo(2); + + if (closeSecond) { + latch = new CountDownLatch(1); + ((EmbeddedChannel) connection.channel()).finishAndReleaseAll(); + connection.onDispose(latch::countDown); + connection.dispose(); + assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue(); + } + + ChannelId id2 = connection.channel().id(); assertThat(id1).isNotEqualTo(id2); acquired1.invalidate().block(); acquired2.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); - assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + + acquired3.get().invalidate().block(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + if (closeSecond) { + assertThat(http2Pool.connections.size()).isEqualTo(0); + } + else { + assertThat(http2Pool.connections.size()).isEqualTo(1); + } } finally { if (connection != null) { @@ -263,7 +300,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection = acquired1.poolable(); @@ -274,7 +311,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); http2Pool.acquire(Duration.ofMillis(10)) @@ -282,12 +319,12 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { .expectError(PoolAcquireTimeoutException.class) .verify(Duration.ofSeconds(1)); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); acquired1.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); } finally { @@ -318,7 +355,7 @@ void maxLifeTime() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection1 = acquired1.poolable(); @@ -326,18 +363,18 @@ void maxLifeTime() throws Exception { Thread.sleep(10); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); acquired1.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection2 = acquired2.poolable(); @@ -347,8 +384,8 @@ void maxLifeTime() throws Exception { acquired2.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); - assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); } finally { if (connection1 != null) { @@ -374,7 +411,7 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 50)); Connection connection1 = null; Connection connection2 = null; @@ -382,21 +419,21 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection1 = acquired1.poolable(); ChannelId id1 = connection1.channel().id(); - Thread.sleep(10); + Thread.sleep(50); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(2); + assertThat(http2Pool.activeStreams()).isEqualTo(2); assertThat(http2Pool.connections.size()).isEqualTo(2); connection2 = acquired2.poolable(); @@ -407,8 +444,8 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { acquired1.invalidate().block(); acquired2.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); - assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); } finally { if (connection1 != null) { @@ -441,14 +478,14 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection = acquired1.poolable(); Thread.sleep(10); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); http2Pool.acquire(Duration.ofMillis(10)) @@ -456,12 +493,12 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { .expectError(PoolAcquireTimeoutException.class) .verify(Duration.ofSeconds(1)); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); acquired1.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); } finally { @@ -488,24 +525,25 @@ void nonHttp2ConnectionEmittedOnce() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); try { PooledRef acquired = http2Pool.acquire().block(Duration.ofSeconds(1)); assertThat(acquired).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); http2Pool.acquire(Duration.ofMillis(10)) .as(StepVerifier::create) .expectError(PoolAcquireTimeoutException.class) .verify(Duration.ofSeconds(1)); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); acquired.invalidate().block(Duration.ofSeconds(1)); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(0); } finally { channel.finishAndReleaseAll(); diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java index 6ee8a43053..32453a4c24 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java @@ -15,6 +15,10 @@ */ package reactor.netty.resources; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; @@ -28,7 +32,9 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.SelfSignedCertificate; import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -62,18 +68,41 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; +import static reactor.netty.Metrics.ACTIVE_CONNECTIONS; +import static reactor.netty.Metrics.CONNECTION_PROVIDER_PREFIX; +import static reactor.netty.Metrics.IDLE_CONNECTIONS; +import static reactor.netty.Metrics.NAME; +import static reactor.netty.Metrics.REMOTE_ADDRESS; +import static reactor.netty.Metrics.TOTAL_CONNECTIONS; +import static reactor.netty.http.client.HttpClientState.STREAM_CONFIGURED; class DefaultPooledConnectionProviderTest extends BaseHttpTest { static SelfSignedCertificate ssc; + private MeterRegistry registry; + @BeforeAll static void createSelfSignedCertificate() throws CertificateException { ssc = new SelfSignedCertificate(); } + @BeforeEach + void setUp() { + registry = new SimpleMeterRegistry(); + Metrics.addRegistry(registry); + } + + @AfterEach + void tearDown() { + Metrics.removeRegistry(registry); + registry.clear(); + registry.close(); + } + @Test void testIssue903() { Http11SslContextSpec serverCtx = Http11SslContextSpec.forServer(ssc.key(), ssc.cert()); @@ -290,7 +319,7 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, } @Test - void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception { + void testConnectionIdleWhenNoActiveStreams() throws Exception { Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()); Http2SslContextSpec clientCtx = Http2SslContextSpec.forClient() @@ -307,19 +336,31 @@ void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception { int requestsNum = 10; CountDownLatch latch = new CountDownLatch(1); DefaultPooledConnectionProvider provider = - (DefaultPooledConnectionProvider) ConnectionProvider.create("testConnectionReturnedToParentPoolWhenNoActiveStreams", 5); + (DefaultPooledConnectionProvider) ConnectionProvider.create("testConnectionIdleWhenNoActiveStreams", 5); AtomicInteger counter = new AtomicInteger(); + AtomicReference serverAddress = new AtomicReference<>(); HttpClient client = createClient(provider, disposableServer.port()) - .wiretap(false) + .wiretap(false) .protocol(HttpProtocol.H2) .secure(spec -> spec.sslContext(clientCtx)) + .metrics(true, Function.identity()) + .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())) .observe((conn, state) -> { - if (state == ConnectionObserver.State.CONNECTED) { + if (state == STREAM_CONFIGURED) { counter.incrementAndGet(); - } - if (state == ConnectionObserver.State.RELEASED && counter.decrementAndGet() == 0) { - latch.countDown(); + conn.onTerminate() + .subscribe(null, + t -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + }), + () -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + })); } }); @@ -328,7 +369,7 @@ void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception { .flatMap(i -> client.post() .uri("/") - .send(ByteBufMono.fromString(Mono.just("testConnectionReturnedToParentPoolWhenNoActiveStreams"))) + .send(ByteBufMono.fromString(Mono.just("testConnectionIdleWhenNoActiveStreams"))) .responseContent() .aggregate() .asString()) @@ -336,14 +377,16 @@ void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception { assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); - assertThat(provider.channelPools).hasSize(1); + InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); + String address = sa.getHostString() + ":" + sa.getPort(); - @SuppressWarnings({"unchecked", "rawtypes"}) - InstrumentedPool channelPool = - provider.channelPools.values().toArray(new InstrumentedPool[0])[0]; - InstrumentedPool.PoolMetrics metrics = channelPool.metrics(); - assertThat(metrics.acquiredSize()).isEqualTo(0); - assertThat(metrics.allocatedSize()).isEqualTo(metrics.idleSize()); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.testConnectionIdleWhenNoActiveStreams")).isEqualTo(0); + double idleConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.testConnectionIdleWhenNoActiveStreams"); + double totalConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "testConnectionIdleWhenNoActiveStreams"); + assertThat(totalConn).isEqualTo(idleConn); } finally { provider.disposeLater() @@ -442,21 +485,33 @@ private void doTestIssue1982(HttpProtocol[] serverProtocols, HttpProtocol[] clie .bindNow(); DefaultPooledConnectionProvider provider = - (DefaultPooledConnectionProvider) ConnectionProvider.create("", 5); + (DefaultPooledConnectionProvider) ConnectionProvider.create("doTestIssue1982", 5); CountDownLatch latch = new CountDownLatch(1); AtomicInteger counter = new AtomicInteger(); + AtomicReference serverAddress = new AtomicReference<>(); HttpClient mainClient = clientCtx != null ? HttpClient.create(provider).port(disposableServer.port()).secure(sslContextSpec -> sslContextSpec.sslContext(clientCtx)) : HttpClient.create(provider).port(disposableServer.port()); HttpClient client = mainClient.protocol(clientProtocols) + .metrics(true, Function.identity()) + .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())) .observe((conn, state) -> { - if (state == ConnectionObserver.State.CONNECTED) { + if (state == STREAM_CONFIGURED) { counter.incrementAndGet(); - } - if (state == ConnectionObserver.State.RELEASED && counter.decrementAndGet() == 0) { - latch.countDown(); + conn.onTerminate() + .subscribe(null, + t -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + }), + () -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + })); } }); try { @@ -471,12 +526,16 @@ private void doTestIssue1982(HttpProtocol[] serverProtocols, HttpProtocol[] clie assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); - @SuppressWarnings({"unchecked", "rawtypes"}) - InstrumentedPool channelPool = - provider.channelPools.values().toArray(new InstrumentedPool[0])[0]; - InstrumentedPool.PoolMetrics metrics = channelPool.metrics(); - assertThat(metrics.acquiredSize()).isEqualTo(0); - assertThat(metrics.allocatedSize()).isEqualTo(metrics.idleSize()); + InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); + String address = sa.getHostString() + ":" + sa.getPort(); + + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.doTestIssue1982")).isEqualTo(0); + double idleConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.doTestIssue1982"); + double totalConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "doTestIssue1982"); + assertThat(totalConn).isEqualTo(idleConn); } finally { provider.disposeLater() @@ -512,4 +571,13 @@ public boolean trySuccess(Void result) { return r; } } + + private double getGaugeValue(String gaugeName, String... tags) { + Gauge gauge = registry.find(gaugeName).tags(tags).gauge(); + double result = -1; + if (gauge != null) { + result = gauge.value(); + } + return result; + } } diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java index ac752e082f..a466ff3982 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java @@ -207,15 +207,17 @@ private void doTest(HttpServer server, HttpClient client, String poolName, boole assertThat(metrics.get()).isTrue(); if (isSecured) { assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, poolName)).isEqualTo(1); - assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, poolName)).isEqualTo(1); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, poolName)).isEqualTo(1); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, "http2." + poolName)).isEqualTo(0); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, "http2." + poolName)).isEqualTo(1); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_STREAMS, "http2." + poolName)).isEqualTo(0); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, "http2." + poolName)).isEqualTo(0); } else { assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, poolName)).isEqualTo(0); - assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, poolName)).isEqualTo(0); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, poolName)).isEqualTo(0); } - assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, poolName)).isEqualTo(0); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, poolName)).isEqualTo(0); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + PENDING_CONNECTIONS, poolName)).isEqualTo(0); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + MAX_CONNECTIONS, poolName)).isEqualTo(expectedMaxConnection); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + MAX_PENDING_CONNECTIONS, poolName)).isEqualTo(expectedMaxPendingAcquire); From cfb70d55b5c90dd435780eb32c3e1ebc7e489baa Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 6 Jun 2022 11:18:25 +0300 Subject: [PATCH 02/11] Add maxIdleTime to Http2Pool --- .../resources/PooledConnectionProvider.java | 4 + .../http/client/Http2ConnectionProvider.java | 2 +- .../reactor/netty/http/client/Http2Pool.java | 32 ++++- .../netty/http/client/Http2PoolTest.java | 134 ++++++++++++++++-- 4 files changed, 158 insertions(+), 14 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java index 25947313e2..0823c7562a 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java @@ -455,6 +455,10 @@ PoolBuilder> newPoolInternal( return poolBuilder; } + public long maxIdleTime() { + return this.maxIdleTime; + } + public long maxLifeTime() { return maxLifeTime; } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index 59ae2224e7..fdc6bd77c4 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -471,7 +471,7 @@ static final class PooledConnectionAllocator { this.remoteAddress = remoteAddress; this.resolver = resolver; this.pool = poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, - poolConFig -> new Http2Pool(poolConFig, poolFactory.maxLifeTime())); + poolConFig -> new Http2Pool(poolConFig, poolFactory.maxIdleTime(), poolFactory.maxLifeTime())); } Publisher connectChannel() { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index 864b32e430..9be17a4f46 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -61,6 +61,7 @@ *

    *
  • The connection is closed.
  • *
  • The connection has reached its life time and there are no active streams.
  • + *
  • The connection has reached its idle time and there are no active streams.
  • *
  • When the client is in one of the two modes: 1) H2 and HTTP/1.1 or 2) H2C and HTTP/1.1, * and the negotiated protocol is HTTP/1.1.
  • *
@@ -145,18 +146,20 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool. AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "wip"); final Clock clock; + final long maxIdleTime; final long maxLifeTime; final PoolConfig poolConfig; long lastInteractionTimestamp; - Http2Pool(PoolConfig poolConfig, long maxLifeTime) { + Http2Pool(PoolConfig poolConfig, long maxIdleTime, long maxLifeTime) { if (poolConfig.allocationStrategy().getPermits(0) != 0) { throw new IllegalArgumentException("No support for configuring minimum number of connections"); } this.clock = poolConfig.clock(); this.connections = new ConcurrentLinkedQueue<>(); this.lastInteractionTimestamp = clock.millis(); + this.maxIdleTime = maxIdleTime; this.maxLifeTime = maxLifeTime; this.pending = new ConcurrentLinkedDeque<>(); this.poolConfig = poolConfig; @@ -458,7 +461,18 @@ Slot findConnection(ConcurrentLinkedQueue resources) { continue; } - // check that the connection's max lifetime has not been reached + // check whether the connection's idle time has been reached + if (maxIdleTime != -1 && slot.idleTime() >= maxIdleTime) { + if (log.isDebugEnabled()) { + log.debug(format(slot.connection.channel(), "Idle time is reached, remove from pool")); + } + //"FutureReturnValueIgnored" this is deliberate + slot.connection.channel().close(); + slot.invalidate(); + continue; + } + + // check whether the connection's max lifetime has been reached if (maxLifeReached(slot)) { if (slot.concurrency() > 0) { if (log.isDebugEnabled()) { @@ -794,6 +808,8 @@ static final class Slot extends AtomicBoolean { final long creationTimestamp; final Http2Pool pool; + long idleTimestamp; + volatile ChannelHandlerContext http2FrameCodecCtx; volatile ChannelHandlerContext http2MultiplexHandlerCtx; @@ -827,7 +843,17 @@ void deactivate() { } int decrementConcurrencyAndGet() { - return CONCURRENCY.decrementAndGet(this); + int concurrency = CONCURRENCY.decrementAndGet(this); + idleTimestamp = pool.clock.millis(); + return concurrency; + } + + long idleTime() { + if (concurrency() > 0) { + return 0L; + } + long idleTime = idleTimestamp != 0 ? idleTimestamp : creationTimestamp; + return pool.clock.millis() - idleTime; } @Nullable diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index ec68ef3669..bc1ec1a561 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -53,7 +53,7 @@ void acquireInvalidate() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); try { List> acquired = new ArrayList<>(); @@ -94,7 +94,7 @@ void acquireRelease() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); try { List> acquired = new ArrayList<>(); @@ -138,7 +138,7 @@ void evictClosedConnection() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); Connection connection = null; try { @@ -211,7 +211,7 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); Connection connection = null; try { @@ -293,7 +293,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); Connection connection = null; try { @@ -335,6 +335,120 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { } } + @Test + void maxIdleTime() throws Exception { + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.fromSupplier(() -> { + Channel channel = new EmbeddedChannel( + new TestChannelId(), + Http2FrameCodecBuilder.forClient().build()); + return Connection.from(channel); + })) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(0, 1); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10, -1)); + + Connection connection1 = null; + Connection connection2 = null; + try { + PooledRef acquired1 = http2Pool.acquire().block(); + + assertThat(acquired1).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + + connection1 = acquired1.poolable(); + ChannelId id1 = connection1.channel().id(); + + acquired1.invalidate().block(); + + Thread.sleep(15); + + PooledRef acquired2 = http2Pool.acquire().block(); + + assertThat(acquired2).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + + connection2 = acquired2.poolable(); + ChannelId id2 = connection2.channel().id(); + + assertThat(id1).isNotEqualTo(id2); + + acquired2.invalidate().block(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); + } + finally { + if (connection1 != null) { + ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll(); + connection1.dispose(); + } + if (connection2 != null) { + ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll(); + connection2.dispose(); + } + } + } + + @Test + void maxIdleTimeActiveStreams() throws Exception { + EmbeddedChannel channel = new EmbeddedChannel(new TestChannelId(), + Http2FrameCodecBuilder.forClient().build(), new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.just(Connection.from(channel))) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(0, 1); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10, -1)); + + Connection connection1 = null; + Connection connection2 = null; + try { + List> acquired = new ArrayList<>(); + http2Pool.acquire().subscribe(acquired::add); + http2Pool.acquire().subscribe(acquired::add); + + channel.runPendingTasks(); + + assertThat(acquired).hasSize(2); + assertThat(http2Pool.activeStreams()).isEqualTo(2); + assertThat(http2Pool.connections.size()).isEqualTo(1); + + connection1 = acquired.get(0).poolable(); + ChannelId id1 = connection1.channel().id(); + + acquired.get(0).invalidate().block(); + + Thread.sleep(15); + + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + + connection2 = acquired.get(1).poolable(); + ChannelId id2 = connection2.channel().id(); + + assertThat(id1).isEqualTo(id2); + + acquired.get(1).invalidate().block(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); + } + finally { + if (connection1 != null) { + ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll(); + connection1.dispose(); + } + if (connection2 != null) { + ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll(); + connection2.dispose(); + } + } + } + @Test void maxLifeTime() throws Exception { PoolBuilder> poolBuilder = @@ -347,7 +461,7 @@ void maxLifeTime() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 10)); Connection connection1 = null; Connection connection2 = null; @@ -411,7 +525,7 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 50)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 50)); Connection connection1 = null; Connection connection2 = null; @@ -471,7 +585,7 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 10)); Connection connection = null; try { @@ -514,7 +628,7 @@ void minConnectionsConfigNotSupported() { PoolBuilder> poolBuilder = PoolBuilder.from(Mono.empty()).sizeBetween(1, 2); assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> poolBuilder.build(config -> new Http2Pool(config, -1))); + .isThrownBy(() -> poolBuilder.build(config -> new Http2Pool(config, -1, -1))); } @Test @@ -525,7 +639,7 @@ void nonHttp2ConnectionEmittedOnce() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); try { PooledRef acquired = http2Pool.acquire().block(Duration.ofSeconds(1)); From a2aa3a588a02830126327c3c1fa13130bf828b11 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 6 Jun 2022 11:22:08 +0300 Subject: [PATCH 03/11] Add evictInBackground to Http2Pool --- .../reactor/netty/http/client/Http2Pool.java | 88 +++++++- .../netty/http/client/Http2PoolTest.java | 207 ++++++++++++++++++ 2 files changed, 291 insertions(+), 4 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index 9be17a4f46..31b2ff3308 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -17,6 +17,7 @@ import java.time.Clock; import java.time.Duration; +import java.util.Iterator; import java.util.Objects; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; @@ -85,9 +86,6 @@ * Configurations that are not applicable * *

+ * If minimum connections is specified, the cached connections with active streams will be kept at that minimum + * (can be the best effort). However, if the cached connections have reached max concurrent streams, + * then new connections will be allocated up to the maximum connections limit. + *

* Configurations that are not applicable *

    *
  • {@link PoolConfig#destroyHandler()} - the destroy handler cannot be used as the destruction is more complex.
  • @@ -94,7 +100,6 @@ *
  • {@link PoolConfig#reuseIdleResourcesInLruOrder()} - FIFO is used when checking the connections.
  • *
  • FIFO is used when obtaining the pending borrowers
  • *
  • Warm up functionality is not supported
  • - *
  • Setting minimum connections configuration is not supported
  • *
*

This class is based on * https://github.com/reactor/reactor-pool/blob/v0.2.7/src/main/java/reactor/pool/SimpleDequePool.java @@ -144,23 +149,27 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool. AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "wip"); final Clock clock; + final Long maxConcurrentStreams; final long maxIdleTime; final long maxLifeTime; + final int minConnections; final PoolConfig poolConfig; + final LongAdder totalMaxConcurrentStreams = new LongAdder(); long lastInteractionTimestamp; Disposable evictionTask; - Http2Pool(PoolConfig poolConfig, long maxIdleTime, long maxLifeTime) { - if (poolConfig.allocationStrategy().getPermits(0) != 0) { - throw new IllegalArgumentException("No support for configuring minimum number of connections"); - } + Http2Pool(PoolConfig poolConfig, @Nullable ConnectionProvider.AllocationStrategy allocationStrategy, + long maxIdleTime, long maxLifeTime) { this.clock = poolConfig.clock(); this.connections = new ConcurrentLinkedQueue<>(); this.lastInteractionTimestamp = clock.millis(); + this.maxConcurrentStreams = allocationStrategy instanceof Http2AllocationStrategy ? + ((Http2AllocationStrategy) allocationStrategy).maxConcurrentStreams() : -1; this.maxIdleTime = maxIdleTime; this.maxLifeTime = maxLifeTime; + this.minConnections = allocationStrategy == null ? 0 : allocationStrategy.permitMinimum(); this.pending = new ConcurrentLinkedDeque<>(); this.poolConfig = poolConfig; @@ -353,7 +362,10 @@ void drainLoop() { if (borrowersCount != 0) { // find a connection that can be used for opening a new stream - Slot slot = findConnection(resources); + // when cached connections are below minimum connections, then allocate a new connection + boolean belowMinConnections = minConnections > 0 && + poolConfig.allocationStrategy().permitGranted() < minConnections; + Slot slot = belowMinConnections ? null : findConnection(resources); if (slot != null) { Borrower borrower = pollPending(borrowers, true); if (borrower == null) { @@ -375,52 +387,64 @@ void drainLoop() { }); } else { - int permits = poolConfig.allocationStrategy().getPermits(1); - if (permits <= 0) { - if (maxPending >= 0) { - borrowersCount = pendingSize; - int toCull = borrowersCount - maxPending; - for (int i = 0; i < toCull; i++) { - Borrower extraneous = pollPending(borrowers, true); - if (extraneous != null) { - pendingAcquireLimitReached(extraneous, maxPending); - } - } - } + int resourcesCount = idleSize; + if (minConnections > 0 && + poolConfig.allocationStrategy().permitGranted() >= minConnections && + resourcesCount == 0) { + // connections allocations were triggered } else { - Borrower borrower = pollPending(borrowers, true); - if (borrower == null) { - continue; + int permits = poolConfig.allocationStrategy().getPermits(1); + if (permits <= 0) { + if (maxPending >= 0) { + borrowersCount = pendingSize; + int toCull = borrowersCount - maxPending; + for (int i = 0; i < toCull; i++) { + Borrower extraneous = pollPending(borrowers, true); + if (extraneous != null) { + pendingAcquireLimitReached(extraneous, maxPending); + } + } + } } - if (isDisposed()) { - borrower.fail(new PoolShutdownException()); - return; + else { + if (permits > 1) { + // warmup is not supported + poolConfig.allocationStrategy().returnPermits(permits - 1); + } + Borrower borrower = pollPending(borrowers, true); + if (borrower == null) { + continue; + } + if (isDisposed()) { + borrower.fail(new PoolShutdownException()); + return; + } + borrower.stopPendingCountdown(); + Mono allocator = poolConfig.allocator(); + Mono primary = + allocator.doOnEach(sig -> { + if (sig.isOnNext()) { + Connection newInstance = sig.get(); + assert newInstance != null; + Slot newSlot = new Slot(this, newInstance); + if (log.isDebugEnabled()) { + log.debug(format(newInstance.channel(), "Channel activated")); + } + ACQUIRED.incrementAndGet(this); + borrower.deliver(new Http2PooledRef(newSlot)); + } + else if (sig.isOnError()) { + Throwable error = sig.getThrowable(); + assert error != null; + poolConfig.allocationStrategy().returnPermits(1); + borrower.fail(error); + } + }) + .contextWrite(borrower.currentContext()); + + primary.subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> drain(), this::drain); } - borrower.stopPendingCountdown(); - Mono allocator = poolConfig.allocator(); - Mono primary = - allocator.doOnEach(sig -> { - if (sig.isOnNext()) { - Connection newInstance = sig.get(); - assert newInstance != null; - Slot newSlot = new Slot(this, newInstance); - if (log.isDebugEnabled()) { - log.debug(format(newInstance.channel(), "Channel activated")); - } - ACQUIRED.incrementAndGet(this); - borrower.deliver(new Http2PooledRef(newSlot)); - } - else if (sig.isOnError()) { - Throwable error = sig.getThrowable(); - assert error != null; - poolConfig.allocationStrategy().returnPermits(1); - borrower.fail(error); - } - }) - .contextWrite(borrower.currentContext()); - - primary.subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> drain(), this::drain); } } } @@ -725,10 +749,10 @@ Context currentContext() { @Override public void request(long n) { if (Operators.validate(n)) { - // Cannot rely on idleSize because there might be idle connections but not suitable for use + long estimateStreamsCount = pool.totalMaxConcurrentStreams.longValue() - pool.acquired; int permits = pool.poolConfig.allocationStrategy().estimatePermitCount(); int pending = pool.pendingSize; - if (!acquireTimeout.isZero() && permits <= pending) { + if (!acquireTimeout.isZero() && permits + estimateStreamsCount <= pending) { timeoutTask = Schedulers.parallel().schedule(this, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS); } pool.doAcquire(this); @@ -889,6 +913,7 @@ static final class Slot extends AtomicBoolean { final Http2Pool pool; long idleTimestamp; + long maxConcurrentStreams; volatile ChannelHandlerContext http2FrameCodecCtx; volatile ChannelHandlerContext http2MultiplexHandlerCtx; @@ -897,12 +922,26 @@ static final class Slot extends AtomicBoolean { this.connection = connection; this.creationTimestamp = pool.clock.millis(); this.pool = pool; + ChannelHandlerContext frameCodec = http2FrameCodecCtx(); + if (frameCodec != null && http2MultiplexHandlerCtx() != null) { + this.maxConcurrentStreams = ((Http2FrameCodec) frameCodec.handler()).connection().local().maxActiveStreams(); + this.maxConcurrentStreams = pool.maxConcurrentStreams == -1 ? maxConcurrentStreams : + Math.min(pool.maxConcurrentStreams, maxConcurrentStreams); + } + this.pool.totalMaxConcurrentStreams.add(this.maxConcurrentStreams); } boolean canOpenStream() { ChannelHandlerContext frameCodec = http2FrameCodecCtx(); if (frameCodec != null && http2MultiplexHandlerCtx() != null) { - int maxActiveStreams = ((Http2FrameCodec) frameCodec.handler()).connection().local().maxActiveStreams(); + long maxActiveStreams = ((Http2FrameCodec) frameCodec.handler()).connection().local().maxActiveStreams(); + maxActiveStreams = pool.maxConcurrentStreams == -1 ? maxActiveStreams : + Math.min(pool.maxConcurrentStreams, maxActiveStreams); + long diff = maxActiveStreams - maxConcurrentStreams; + if (diff != 0) { + maxConcurrentStreams = maxActiveStreams; + pool.totalMaxConcurrentStreams.add(diff); + } int concurrency = this.concurrency; return concurrency < maxActiveStreams; } diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java new file mode 100644 index 0000000000..6725cecfc9 --- /dev/null +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.client; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static reactor.netty.http.client.Http2AllocationStrategy.Build.DEFAULT_MAX_CONCURRENT_STREAMS; +import static reactor.netty.http.client.Http2AllocationStrategy.Build.DEFAULT_MAX_CONNECTIONS; +import static reactor.netty.http.client.Http2AllocationStrategy.Build.DEFAULT_MIN_CONNECTIONS; + +class Http2AllocationStrategyTest { + private Http2AllocationStrategy.Builder builder; + + @BeforeEach + void setUp() { + builder = Http2AllocationStrategy.builder(); + } + + @Test + void build() { + builder.maxConcurrentStreams(2).maxConnections(2).minConnections(1); + Http2AllocationStrategy strategy = builder.build(); + assertThat(strategy.maxConcurrentStreams()).isEqualTo(2); + assertThat(strategy.permitMaximum()).isEqualTo(2); + assertThat(strategy.permitMinimum()).isEqualTo(1); + } + + @Test + void buildBadValues() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> builder.maxConnections(1).minConnections(2).build()) + .withMessage("minConnections must be less than or equal to maxConnections"); + } + + @Test + void copy() { + builder.maxConcurrentStreams(2).maxConnections(2).minConnections(1); + Http2AllocationStrategy strategy = builder.build(); + Http2AllocationStrategy copy = strategy.copy(); + assertThat(copy.maxConcurrentStreams()).isEqualTo(strategy.maxConcurrentStreams()); + assertThat(copy.permitMaximum()).isEqualTo(strategy.permitMaximum()); + assertThat(copy.permitMinimum()).isEqualTo(strategy.permitMinimum()); + } + + @Test + void maxConcurrentStreams() { + builder.maxConcurrentStreams(2); + Http2AllocationStrategy strategy = builder.build(); + assertThat(strategy.maxConcurrentStreams()).isEqualTo(2); + assertThat(strategy.permitMaximum()).isEqualTo(DEFAULT_MAX_CONNECTIONS); + assertThat(strategy.permitMinimum()).isEqualTo(DEFAULT_MIN_CONNECTIONS); + } + + @Test + void maxConcurrentStreamsBadValues() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> builder.maxConcurrentStreams(-2)) + .withMessage("maxConcurrentStreams must be grater than or equal to -1"); + } + + @Test + void permitMaximum() { + builder.maxConnections(2); + Http2AllocationStrategy strategy = builder.build(); + assertThat(strategy.maxConcurrentStreams()).isEqualTo(DEFAULT_MAX_CONCURRENT_STREAMS); + assertThat(strategy.permitMaximum()).isEqualTo(2); + assertThat(strategy.permitMinimum()).isEqualTo(DEFAULT_MIN_CONNECTIONS); + } + + @Test + void permitMaximumBadValues() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> builder.maxConnections(0)) + .withMessage("maxConnections must be strictly positive"); + } + + @Test + void permitMinimum() { + builder.minConnections(2); + Http2AllocationStrategy strategy = builder.build(); + assertThat(strategy.maxConcurrentStreams()).isEqualTo(DEFAULT_MAX_CONCURRENT_STREAMS); + assertThat(strategy.permitMaximum()).isEqualTo(DEFAULT_MAX_CONNECTIONS); + assertThat(strategy.permitMinimum()).isEqualTo(2); + } + + @Test + void permitMinimumBadValues() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> builder.minConnections(-1)) + .withMessage("minConnections must be positive or zero"); + } +} diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index ec317c8828..32606b377e 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -22,6 +22,7 @@ import io.netty.handler.codec.http2.Http2FrameCodecBuilder; import io.netty.handler.codec.http2.Http2MultiplexHandler; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException; @@ -40,7 +41,6 @@ import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; class Http2PoolTest { @@ -53,7 +53,7 @@ void acquireInvalidate() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); try { List> acquired = new ArrayList<>(); @@ -94,7 +94,7 @@ void acquireRelease() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); try { List> acquired = new ArrayList<>(); @@ -138,7 +138,7 @@ void evictClosedConnection() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); Connection connection = null; try { @@ -211,7 +211,7 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); Connection connection = null; try { @@ -293,7 +293,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); Connection connection = null; try { @@ -554,7 +554,7 @@ void maxIdleTime() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, 10, -1)); Connection connection1 = null; Connection connection2 = null; @@ -609,7 +609,7 @@ void maxIdleTimeActiveStreams() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, 10, -1)); Connection connection1 = null; Connection connection2 = null; @@ -668,7 +668,7 @@ void maxLifeTime() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10)); Connection connection1 = null; Connection connection2 = null; @@ -732,7 +732,7 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 50)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 50)); Connection connection1 = null; Connection connection2 = null; @@ -792,7 +792,7 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10)); Connection connection = null; try { @@ -831,11 +831,97 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { } @Test - void minConnectionsConfigNotSupported() { + void minConnections() { + EmbeddedChannel channel = new EmbeddedChannel(new TestChannelId(), + Http2FrameCodecBuilder.forClient().build(), new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); PoolBuilder> poolBuilder = - PoolBuilder.from(Mono.empty()).sizeBetween(1, 2); - assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> poolBuilder.build(config -> new Http2Pool(config, -1, -1))); + PoolBuilder.from(Mono.just(Connection.from(channel))) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(1, 3); + Http2AllocationStrategy strategy = Http2AllocationStrategy.builder() + .maxConnections(3) + .minConnections(1) + .build(); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, strategy, -1, -1)); + + List> acquired = new ArrayList<>(); + try { + Flux.range(0, 3) + .flatMap(i -> http2Pool.acquire().doOnNext(acquired::add)) + .subscribe(); + + channel.runPendingTasks(); + + assertThat(acquired).hasSize(3); + + assertThat(http2Pool.activeStreams()).isEqualTo(3); + assertThat(acquired.get(0).poolable()).isSameAs(acquired.get(1).poolable()); + assertThat(acquired.get(0).poolable()).isSameAs(acquired.get(2).poolable()); + + for (PooledRef slot : acquired) { + slot.release().block(Duration.ofSeconds(1)); + } + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + } + finally { + for (PooledRef slot : acquired) { + Connection conn = slot.poolable(); + ((EmbeddedChannel) conn.channel()).finishAndReleaseAll(); + conn.dispose(); + } + } + } + + @Test + void minConnectionsMaxStreamsReached() { + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.fromSupplier(() -> { + Channel channel = new EmbeddedChannel( + new TestChannelId(), + Http2FrameCodecBuilder.forClient().build()); + return Connection.from(channel); + })) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(1, 3); + Http2AllocationStrategy strategy = Http2AllocationStrategy.builder() + .maxConnections(3) + .minConnections(1) + .build(); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, strategy, -1, -1)); + + List> acquired = new ArrayList<>(); + try { + Flux.range(0, 3) + .flatMap(i -> http2Pool.acquire().doOnNext(acquired::add)) + .blockLast(Duration.ofSeconds(1)); + + assertThat(acquired).hasSize(3); + + for (PooledRef pooledRef : acquired) { + ((EmbeddedChannel) pooledRef.poolable().channel()).runPendingTasks(); + } + + assertThat(http2Pool.activeStreams()).isEqualTo(3); + assertThat(acquired.get(0).poolable()).isNotSameAs(acquired.get(1).poolable()); + assertThat(acquired.get(0).poolable()).isNotSameAs(acquired.get(2).poolable()); + assertThat(acquired.get(1).poolable()).isNotSameAs(acquired.get(2).poolable()); + + for (PooledRef slot : acquired) { + slot.release().block(Duration.ofSeconds(1)); + } + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + } + finally { + for (PooledRef slot : acquired) { + Connection conn = slot.poolable(); + ((EmbeddedChannel) conn.channel()).finishAndReleaseAll(); + conn.dispose(); + } + } } @Test @@ -846,7 +932,7 @@ void nonHttp2ConnectionEmittedOnce() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); try { PooledRef acquired = http2Pool.acquire().block(Duration.ofSeconds(1)); diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java index 32453a4c24..0cad951be8 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java @@ -49,6 +49,7 @@ import reactor.netty.http.Http11SslContextSpec; import reactor.netty.http.Http2SslContextSpec; import reactor.netty.http.HttpProtocol; +import reactor.netty.http.client.Http2AllocationStrategy; import reactor.netty.http.client.HttpClient; import reactor.netty.http.server.HttpServer; import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; @@ -543,6 +544,86 @@ private void doTestIssue1982(HttpProtocol[] serverProtocols, HttpProtocol[] clie } } + //https://github.com/reactor/reactor-netty/issues/1808 + @Test + void testMinConnections() throws Exception { + Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()); + Http2SslContextSpec clientCtx = + Http2SslContextSpec.forClient() + .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE)); + + disposableServer = + createServer() + .wiretap(false) + .protocol(HttpProtocol.H2) + .secure(spec -> spec.sslContext(serverCtx)) + .route(routes -> routes.post("/", (req, res) -> res.send(req.receive().retain()))) + .bindNow(); + + int requestsNum = 100; + CountDownLatch latch = new CountDownLatch(1); + DefaultPooledConnectionProvider provider = + (DefaultPooledConnectionProvider) ConnectionProvider.builder("testMinConnections") + .allocationStrategy(Http2AllocationStrategy.builder().maxConnections(20).minConnections(5).build()) + .build(); + AtomicInteger counter = new AtomicInteger(); + AtomicReference serverAddress = new AtomicReference<>(); + HttpClient client = + createClient(provider, disposableServer.port()) + .wiretap(false) + .protocol(HttpProtocol.H2) + .secure(spec -> spec.sslContext(clientCtx)) + .metrics(true, Function.identity()) + .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())) + .observe((conn, state) -> { + if (state == STREAM_CONFIGURED) { + counter.incrementAndGet(); + conn.onTerminate() + .subscribe(null, + t -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + }), + () -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + })); + } + }); + + try { + Flux.range(0, requestsNum) + .flatMap(i -> + client.post() + .uri("/") + .send(ByteBufMono.fromString(Mono.just("testMinConnections"))) + .responseContent() + .aggregate() + .asString()) + .blockLast(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + + InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); + String address = sa.getHostString() + ":" + sa.getPort(); + + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.testMinConnections")).isEqualTo(0); + double idleConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.testMinConnections"); + double totalConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "testMinConnections"); + assertThat(totalConn).isEqualTo(idleConn); + assertThat(totalConn).isLessThan(10); + } + finally { + provider.disposeLater() + .block(Duration.ofSeconds(5)); + } + } + static final class TestPromise extends DefaultChannelPromise { final ChannelPromise parent; From 37653e3acc2cb9a3f0a86cdc000cfcb0e7a9c66e Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 7 Jun 2022 13:45:38 +0300 Subject: [PATCH 07/11] Use better wording in javadoc and exception message Fix typo Adapt the junit tests --- .../reactor/netty/http/client/Http2AllocationStrategy.java | 7 ++++--- .../netty/http/client/Http2AllocationStrategyTest.java | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java index c879cae76c..2dd5c49c61 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java @@ -42,7 +42,7 @@ public interface Builder { /** * Configures the maximum number of the concurrent streams that can be opened to the remote peer. * When evaluating how many streams can be opened to the remote peer, - * the minimum of this configuration and the remote peer configuration is taken. + * the minimum of this configuration and the remote peer configuration is taken (unless -1 is used). * Default to {@code -1} - use always the remote peer configuration. * * @param maxConcurrentStreams the maximum number of the concurrent streams that can be opened to the remote peer @@ -174,7 +174,8 @@ static final class Build implements Builder { @Override public Http2AllocationStrategy build() { if (minConnections > maxConnections) { - throw new IllegalArgumentException("minConnections must be less than or equal to maxConnections"); + throw new IllegalArgumentException("minConnections (" + minConnections + ")" + + " must be less than or equal to maxConnections (" + maxConnections + ")"); } return new Http2AllocationStrategy(this); } @@ -182,7 +183,7 @@ public Http2AllocationStrategy build() { @Override public Builder maxConcurrentStreams(long maxConcurrentStreams) { if (maxConcurrentStreams < -1) { - throw new IllegalArgumentException("maxConcurrentStreams must be grater than or equal to -1"); + throw new IllegalArgumentException("maxConcurrentStreams must be greater than or equal to -1"); } this.maxConcurrentStreams = maxConcurrentStreams; return this; diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java index 6725cecfc9..d708754cf2 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java @@ -45,7 +45,7 @@ void build() { void buildBadValues() { assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy(() -> builder.maxConnections(1).minConnections(2).build()) - .withMessage("minConnections must be less than or equal to maxConnections"); + .withMessage("minConnections (2) must be less than or equal to maxConnections (1)"); } @Test @@ -71,7 +71,7 @@ void maxConcurrentStreams() { void maxConcurrentStreamsBadValues() { assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy(() -> builder.maxConcurrentStreams(-2)) - .withMessage("maxConcurrentStreams must be grater than or equal to -1"); + .withMessage("maxConcurrentStreams must be greater than or equal to -1"); } @Test From 36441a69846c301f77ac57976cd69445e7919425 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 8 Jun 2022 09:21:35 +0300 Subject: [PATCH 08/11] Polish javadoc --- .../main/java/reactor/netty/resources/ConnectionProvider.java | 1 + 1 file changed, 1 insertion(+) diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java index 18d64514ac..db831832c9 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java @@ -498,6 +498,7 @@ public final SPEC pendingAcquireTimeout(Duration pendingAcquireTimeout) { * * @param maxConnections the maximum number of connections (per connection pool) before start pending * @return {@literal this} + * @see #allocationStrategy(AllocationStrategy) * @throws IllegalArgumentException if maxConnections is negative */ public final SPEC maxConnections(int maxConnections) { From b567f7ef82dfb627aa1aae3de2837b4611e52e26 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 8 Jun 2022 09:27:56 +0300 Subject: [PATCH 09/11] Use AtomicLongFieldUpdater instead of LongAdder --- .../java/reactor/netty/http/client/Http2Pool.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index 66cb08d33b..e212e832db 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -24,8 +24,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; import io.netty.channel.ChannelHandlerContext; @@ -144,6 +144,10 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool. @SuppressWarnings("rawtypes") static final ConcurrentLinkedDeque TERMINATED = new ConcurrentLinkedDeque(); + volatile long totalMaxConcurrentStreams; + static final AtomicLongFieldUpdater TOTAL_MAX_CONCURRENT_STREAMS = + AtomicLongFieldUpdater.newUpdater(Http2Pool.class, "totalMaxConcurrentStreams"); + volatile int wip; static final AtomicIntegerFieldUpdater WIP = AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "wip"); @@ -154,7 +158,6 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool. final long maxLifeTime; final int minConnections; final PoolConfig poolConfig; - final LongAdder totalMaxConcurrentStreams = new LongAdder(); long lastInteractionTimestamp; @@ -749,7 +752,7 @@ Context currentContext() { @Override public void request(long n) { if (Operators.validate(n)) { - long estimateStreamsCount = pool.totalMaxConcurrentStreams.longValue() - pool.acquired; + long estimateStreamsCount = pool.totalMaxConcurrentStreams - pool.acquired; int permits = pool.poolConfig.allocationStrategy().estimatePermitCount(); int pending = pool.pendingSize; if (!acquireTimeout.isZero() && permits + estimateStreamsCount <= pending) { @@ -928,7 +931,7 @@ static final class Slot extends AtomicBoolean { this.maxConcurrentStreams = pool.maxConcurrentStreams == -1 ? maxConcurrentStreams : Math.min(pool.maxConcurrentStreams, maxConcurrentStreams); } - this.pool.totalMaxConcurrentStreams.add(this.maxConcurrentStreams); + TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, this.maxConcurrentStreams); } boolean canOpenStream() { @@ -940,7 +943,7 @@ boolean canOpenStream() { long diff = maxActiveStreams - maxConcurrentStreams; if (diff != 0) { maxConcurrentStreams = maxActiveStreams; - pool.totalMaxConcurrentStreams.add(diff); + TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, diff); } int concurrency = this.concurrency; return concurrency < maxActiveStreams; From 43ea5457c19374225aa17e552f3e1e0a65c6ebd1 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 8 Jun 2022 10:01:02 +0300 Subject: [PATCH 10/11] Update totalMaxConcurrentStreams when the connection is removed from the pool --- .../reactor/netty/http/client/Http2Pool.java | 1 + .../netty/http/client/Http2PoolTest.java | 60 +++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index e212e832db..4140c9eec7 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -1010,6 +1010,7 @@ void invalidate() { log.debug(format(connection.channel(), "Channel removed from pool")); } pool.poolConfig.allocationStrategy().returnPermits(1); + TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, -maxConcurrentStreams); } } diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index 32606b377e..6780f795c2 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -65,12 +65,14 @@ void acquireInvalidate() { assertThat(acquired).hasSize(3); assertThat(http2Pool.activeStreams()).isEqualTo(3); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); for (PooledRef slot : acquired) { slot.invalidate().block(Duration.ofSeconds(1)); } assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); for (PooledRef slot : acquired) { // second invalidate() should be ignored and ACQUIRED size should remain the same @@ -78,6 +80,7 @@ void acquireInvalidate() { } assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); } finally { channel.finishAndReleaseAll(); @@ -106,12 +109,14 @@ void acquireRelease() { assertThat(acquired).hasSize(3); assertThat(http2Pool.activeStreams()).isEqualTo(3); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); for (PooledRef slot : acquired) { slot.release().block(Duration.ofSeconds(1)); } assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); for (PooledRef slot : acquired) { // second release() should be ignored and ACQUIRED size should remain the same @@ -119,6 +124,7 @@ void acquireRelease() { } assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); } finally { channel.finishAndReleaseAll(); @@ -147,6 +153,7 @@ void evictClosedConnection() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection = acquired1.poolable(); ChannelId id1 = connection.channel().id(); @@ -159,17 +166,20 @@ void evictClosedConnection() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); acquired1.invalidate().block(); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection = acquired2.poolable(); ChannelId id2 = connection.channel().id(); @@ -180,6 +190,7 @@ void evictClosedConnection() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection != null) { @@ -220,6 +231,7 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); connection = acquired1.poolable(); ChannelId id1 = connection.channel().id(); @@ -232,6 +244,7 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); @@ -244,6 +257,7 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) assertThat(http2Pool.activeStreams()).isEqualTo(3); assertThat(http2Pool.connections.size()).isEqualTo(2); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(2L * Integer.MAX_VALUE); if (closeSecond) { latch = new CountDownLatch(1); @@ -262,15 +276,18 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); acquired3.get().invalidate().block(); assertThat(http2Pool.activeStreams()).isEqualTo(0); if (closeSecond) { assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } else { assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); } } finally { @@ -302,6 +319,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection = acquired1.poolable(); CountDownLatch latch = new CountDownLatch(1); @@ -313,6 +331,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); http2Pool.acquire(Duration.ofMillis(10)) .as(StepVerifier::create) @@ -321,11 +340,13 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); acquired1.invalidate().block(); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection != null) { @@ -357,6 +378,7 @@ void evictInBackgroundClosedConnection() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection = acquired1.poolable(); ChannelId id1 = connection.channel().id(); @@ -369,6 +391,7 @@ void evictInBackgroundClosedConnection() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); acquired1.invalidate().block(); @@ -376,12 +399,14 @@ void evictInBackgroundClosedConnection() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection = acquired2.poolable(); ChannelId id2 = connection.channel().id(); @@ -394,6 +419,7 @@ void evictInBackgroundClosedConnection() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection != null) { @@ -426,6 +452,7 @@ void evictInBackgroundMaxIdleTime() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection1 = acquired1.poolable(); ChannelId id1 = connection1.channel().id(); @@ -438,12 +465,14 @@ void evictInBackgroundMaxIdleTime() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection2 = acquired2.poolable(); ChannelId id2 = connection2.channel().id(); @@ -458,6 +487,7 @@ void evictInBackgroundMaxIdleTime() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection1 != null) { @@ -494,6 +524,7 @@ void evictInBackgroundMaxLifeTime() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection1 = acquired1.poolable(); ChannelId id1 = connection1.channel().id(); @@ -502,6 +533,7 @@ void evictInBackgroundMaxLifeTime() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); acquired1.invalidate().block(); @@ -509,12 +541,14 @@ void evictInBackgroundMaxLifeTime() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection2 = acquired2.poolable(); ChannelId id2 = connection2.channel().id(); @@ -529,6 +563,7 @@ void evictInBackgroundMaxLifeTime() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection1 != null) { @@ -564,6 +599,7 @@ void maxIdleTime() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection1 = acquired1.poolable(); ChannelId id1 = connection1.channel().id(); @@ -577,6 +613,7 @@ void maxIdleTime() throws Exception { assertThat(acquired2).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection2 = acquired2.poolable(); ChannelId id2 = connection2.channel().id(); @@ -587,6 +624,7 @@ void maxIdleTime() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection1 != null) { @@ -623,6 +661,7 @@ void maxIdleTimeActiveStreams() throws Exception { assertThat(acquired).hasSize(2); assertThat(http2Pool.activeStreams()).isEqualTo(2); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); connection1 = acquired.get(0).poolable(); ChannelId id1 = connection1.channel().id(); @@ -633,6 +672,7 @@ void maxIdleTimeActiveStreams() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); connection2 = acquired.get(1).poolable(); ChannelId id2 = connection2.channel().id(); @@ -643,6 +683,7 @@ void maxIdleTimeActiveStreams() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); } finally { if (connection1 != null) { @@ -678,6 +719,7 @@ void maxLifeTime() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection1 = acquired1.poolable(); ChannelId id1 = connection1.channel().id(); @@ -686,17 +728,20 @@ void maxLifeTime() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); acquired1.invalidate().block(); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection2 = acquired2.poolable(); ChannelId id2 = connection2.channel().id(); @@ -707,6 +752,7 @@ void maxLifeTime() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection1 != null) { @@ -742,6 +788,7 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection1 = acquired1.poolable(); ChannelId id1 = connection1.channel().id(); @@ -750,12 +797,14 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(2); assertThat(http2Pool.connections.size()).isEqualTo(2); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection2 = acquired2.poolable(); ChannelId id2 = connection2.channel().id(); @@ -767,6 +816,7 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection1 != null) { @@ -801,6 +851,7 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection = acquired1.poolable(); @@ -808,6 +859,7 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); http2Pool.acquire(Duration.ofMillis(10)) .as(StepVerifier::create) @@ -816,11 +868,13 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); acquired1.invalidate().block(); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection != null) { @@ -858,12 +912,14 @@ void minConnections() { assertThat(http2Pool.activeStreams()).isEqualTo(3); assertThat(acquired.get(0).poolable()).isSameAs(acquired.get(1).poolable()); assertThat(acquired.get(0).poolable()).isSameAs(acquired.get(2).poolable()); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); for (PooledRef slot : acquired) { slot.release().block(Duration.ofSeconds(1)); } assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); } finally { for (PooledRef slot : acquired) { @@ -908,12 +964,14 @@ void minConnectionsMaxStreamsReached() { assertThat(acquired.get(0).poolable()).isNotSameAs(acquired.get(1).poolable()); assertThat(acquired.get(0).poolable()).isNotSameAs(acquired.get(2).poolable()); assertThat(acquired.get(1).poolable()).isNotSameAs(acquired.get(2).poolable()); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); for (PooledRef slot : acquired) { slot.release().block(Duration.ofSeconds(1)); } assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { for (PooledRef slot : acquired) { @@ -939,6 +997,7 @@ void nonHttp2ConnectionEmittedOnce() { assertThat(acquired).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); http2Pool.acquire(Duration.ofMillis(10)) .as(StepVerifier::create) @@ -951,6 +1010,7 @@ void nonHttp2ConnectionEmittedOnce() { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { channel.finishAndReleaseAll(); From b53658194790d883bf5f3cd344ccfe5dcba9f0b0 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 8 Jun 2022 10:37:12 +0300 Subject: [PATCH 11/11] Obtain the application protocol once Cache H2CUpgradeHandler context --- .../http/client/Http2ConnectionProvider.java | 19 ++++++--------- .../reactor/netty/http/client/Http2Pool.java | 24 +++++++++++++++++++ 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index 06ff01db89..b80d44b553 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -17,14 +17,12 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.codec.http2.Http2LocalFlowController; import io.netty.handler.codec.http2.Http2StreamChannel; import io.netty.handler.codec.http2.Http2StreamChannelBootstrap; import io.netty.handler.ssl.ApplicationProtocolNames; -import io.netty.handler.ssl.SslHandler; import io.netty.resolver.AddressResolverGroup; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; @@ -39,7 +37,6 @@ import reactor.core.publisher.Operators; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; -import reactor.netty.NettyPipeline; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.channel.ChannelOperations; import reactor.netty.resources.ConnectionProvider; @@ -381,7 +378,7 @@ public void operationComplete(Future future) { boolean isH2cUpgrade() { Channel channel = pooledRef.poolable().channel(); - if (channel.pipeline().get(NettyPipeline.H2CUpgradeHandler) != null && + if (((Http2Pool.Http2PooledRef) pooledRef).slot.h2cUpgradeHandlerCtx() != null && ((Http2Pool.Http2PooledRef) pooledRef).slot.http2MultiplexHandlerCtx() == null) { ChannelOperations ops = ChannelOperations.get(channel); if (ops != null) { @@ -394,11 +391,9 @@ boolean isH2cUpgrade() { boolean notHttp2() { Channel channel = pooledRef.poolable().channel(); - ChannelPipeline pipeline = channel.pipeline(); - SslHandler handler = pipeline.get(SslHandler.class); - if (handler != null) { - String protocol = handler.applicationProtocol() != null ? handler.applicationProtocol() : ApplicationProtocolNames.HTTP_1_1; - if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { + String applicationProtocol = ((Http2Pool.Http2PooledRef) pooledRef).slot.applicationProtocol; + if (applicationProtocol != null) { + if (ApplicationProtocolNames.HTTP_1_1.equals(applicationProtocol)) { // No information for the negotiated application-level protocol, // or it is HTTP/1.1, continue as an HTTP/1.1 request // and remove the connection from this pool. @@ -409,14 +404,14 @@ boolean notHttp2() { return true; } } - else if (!ApplicationProtocolNames.HTTP_2.equals(handler.applicationProtocol())) { + else if (!ApplicationProtocolNames.HTTP_2.equals(applicationProtocol)) { channel.attr(OWNER).set(null); invalidate(this); - sink.error(new IOException("Unknown protocol [" + protocol + "].")); + sink.error(new IOException("Unknown protocol [" + applicationProtocol + "].")); return true; } } - else if (pipeline.get(NettyPipeline.H2CUpgradeHandler) == null && + else if (((Http2Pool.Http2PooledRef) pooledRef).slot.h2cUpgradeHandlerCtx() == null && ((Http2Pool.Http2PooledRef) pooledRef).slot.http2MultiplexHandlerCtx() == null) { // It is not H2. There are no handlers for H2C upgrade/H2C prior-knowledge, // continue as an HTTP/1.1 request and remove the connection from this pool. diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index 4140c9eec7..a1472c1c74 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -31,6 +31,8 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.SslHandler; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -42,6 +44,7 @@ import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; import reactor.netty.FutureMono; +import reactor.netty.NettyPipeline; import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; import reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException; import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException; @@ -914,12 +917,14 @@ static final class Slot extends AtomicBoolean { final Connection connection; final long creationTimestamp; final Http2Pool pool; + final String applicationProtocol; long idleTimestamp; long maxConcurrentStreams; volatile ChannelHandlerContext http2FrameCodecCtx; volatile ChannelHandlerContext http2MultiplexHandlerCtx; + volatile ChannelHandlerContext h2cUpgradeHandlerCtx; Slot(Http2Pool pool, Connection connection) { this.connection = connection; @@ -932,6 +937,14 @@ static final class Slot extends AtomicBoolean { Math.min(pool.maxConcurrentStreams, maxConcurrentStreams); } TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, this.maxConcurrentStreams); + SslHandler handler = connection.channel().pipeline().get(SslHandler.class); + if (handler != null) { + this.applicationProtocol = handler.applicationProtocol() != null ? + handler.applicationProtocol() : ApplicationProtocolNames.HTTP_1_1; + } + else { + this.applicationProtocol = null; + } } boolean canOpenStream() { @@ -1000,6 +1013,17 @@ ChannelHandlerContext http2MultiplexHandlerCtx() { return ctx; } + @Nullable + ChannelHandlerContext h2cUpgradeHandlerCtx() { + ChannelHandlerContext ctx = h2cUpgradeHandlerCtx; + if (ctx != null && !ctx.isRemoved()) { + return ctx; + } + ctx = connection.channel().pipeline().context(NettyPipeline.H2CUpgradeHandler); + h2cUpgradeHandlerCtx = ctx; + return ctx; + } + void incrementConcurrencyAndGet() { CONCURRENCY.incrementAndGet(this); }