diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java index a19816e512..25947313e2 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java @@ -89,6 +89,7 @@ public abstract class PooledConnectionProvider implements final Duration poolInactivity; final Duration disposeTimeout; final Map maxConnections = new HashMap<>(); + Mono onDispose; protected PooledConnectionProvider(Builder builder) { this(builder, null); @@ -106,6 +107,7 @@ protected PooledConnectionProvider(Builder builder) { poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout)); maxConnections.put(entry.getKey(), entry.getValue().maxConnections); } + this.onDispose = Mono.empty(); scheduleInactivePoolsDisposal(); } @@ -190,10 +192,10 @@ public final Mono disposeLater() { }) .collect(Collectors.toList()); if (pools.isEmpty()) { - return Mono.empty(); + return onDispose; } channelPools.clear(); - return Mono.when(pools); + return onDispose.and(Mono.when(pools)); }); } @@ -243,6 +245,10 @@ public String name() { return name; } + public void onDispose(Mono disposeMono) { + onDispose = onDispose.and(disposeMono); + } + protected abstract CoreSubscriber> createDisposableAcquire( TransportConfig config, ConnectionObserver connectionObserver, diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java b/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java index 801dcac824..a94268145b 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java @@ -15,6 +15,7 @@ */ package reactor.netty.http; +import java.net.SocketAddress; import java.time.Duration; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -147,6 +148,15 @@ public static HttpResources set(LoopResources loops) { http2ConnectionProvider = new AtomicReference<>(); } + @Override + public void disposeWhen(SocketAddress remoteAddress) { + ConnectionProvider provider = http2ConnectionProvider.get(); + if (provider != null) { + provider.disposeWhen(remoteAddress); + } + super.disposeWhen(remoteAddress); + } + @Override public AddressResolverGroup getOrCreateDefaultResolver() { return super.getOrCreateDefaultResolver(); diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index cf76c73902..f8cb3eb3b4 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -16,13 +16,12 @@ package reactor.netty.http.client; import io.netty.channel.Channel; -import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.codec.http2.Http2LocalFlowController; import io.netty.handler.codec.http2.Http2StreamChannel; import io.netty.handler.ssl.ApplicationProtocolNames; -import io.netty.handler.ssl.SslHandler; import io.netty.resolver.AddressResolverGroup; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; @@ -37,7 +36,6 @@ import reactor.core.publisher.Operators; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; -import reactor.netty.NettyPipeline; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.channel.ChannelOperations; import reactor.netty.resources.ConnectionProvider; @@ -76,6 +74,9 @@ final class Http2ConnectionProvider extends PooledConnectionProvider Http2ConnectionProvider(ConnectionProvider parent) { super(initConfiguration(parent)); this.parent = parent; + if (parent instanceof PooledConnectionProvider) { + ((PooledConnectionProvider) parent).onDispose(disposeLater()); + } } static Builder initConfiguration(ConnectionProvider parent) { @@ -332,11 +333,12 @@ public void onUncaughtException(Connection connection, Throwable error) { @Override public void operationComplete(Future future) { Channel channel = pooledRef.poolable().channel(); - Http2FrameCodec frameCodec = channel.pipeline().get(Http2FrameCodec.class); + ChannelHandlerContext frameCodec = ((Http2Pool.Http2PooledRef) pooledRef).slot.http2FrameCodecCtx(); if (future.isSuccess()) { Http2StreamChannel ch = future.getNow(); - if (!channel.isActive() || frameCodec == null || !frameCodec.connection().local().canOpenStream()) { + if (!channel.isActive() || frameCodec == null || + !((Http2FrameCodec) frameCodec.handler()).connection().local().canOpenStream()) { invalidate(this); if (!retried) { if (log.isDebugEnabled()) { @@ -358,8 +360,8 @@ public void operationComplete(Future future) { sink.success(ops); } - Http2Connection.Endpoint localEndpoint = frameCodec.connection().local(); if (log.isDebugEnabled()) { + Http2Connection.Endpoint localEndpoint = ((Http2FrameCodec) frameCodec.handler()).connection().local(); logStreamsState(ch, localEndpoint, "Stream opened"); } } @@ -372,8 +374,8 @@ public void operationComplete(Future future) { boolean isH2cUpgrade() { Channel channel = pooledRef.poolable().channel(); - if (channel.pipeline().get(NettyPipeline.H2CUpgradeHandler) != null && - channel.pipeline().get(NettyPipeline.H2MultiplexHandler) == null) { + if (((Http2Pool.Http2PooledRef) pooledRef).slot.h2cUpgradeHandlerCtx() != null && + ((Http2Pool.Http2PooledRef) pooledRef).slot.http2MultiplexHandlerCtx() == null) { ChannelOperations ops = ChannelOperations.get(channel); if (ops != null) { sink.success(ops); @@ -385,11 +387,9 @@ boolean isH2cUpgrade() { boolean notHttp2() { Channel channel = pooledRef.poolable().channel(); - ChannelPipeline pipeline = channel.pipeline(); - SslHandler handler = pipeline.get(SslHandler.class); - if (handler != null) { - String protocol = handler.applicationProtocol() != null ? handler.applicationProtocol() : ApplicationProtocolNames.HTTP_1_1; - if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { + String applicationProtocol = ((Http2Pool.Http2PooledRef) pooledRef).slot.applicationProtocol; + if (applicationProtocol != null) { + if (ApplicationProtocolNames.HTTP_1_1.equals(applicationProtocol)) { // No information for the negotiated application-level protocol, // or it is HTTP/1.1, continue as an HTTP/1.1 request // and remove the connection from this pool. @@ -400,15 +400,15 @@ boolean notHttp2() { return true; } } - else if (!ApplicationProtocolNames.HTTP_2.equals(handler.applicationProtocol())) { + else if (!ApplicationProtocolNames.HTTP_2.equals(applicationProtocol)) { channel.attr(OWNER).set(null); invalidate(this); - sink.error(new IOException("Unknown protocol [" + protocol + "].")); + sink.error(new IOException("Unknown protocol [" + applicationProtocol + "].")); return true; } } - else if (pipeline.get(NettyPipeline.H2CUpgradeHandler) == null && - pipeline.get(NettyPipeline.H2MultiplexHandler) == null) { + else if (((Http2Pool.Http2PooledRef) pooledRef).slot.h2cUpgradeHandlerCtx() == null && + ((Http2Pool.Http2PooledRef) pooledRef).slot.http2MultiplexHandlerCtx() == null) { // It is not H2. There are no handlers for H2C upgrade/H2C prior-knowledge, // continue as an HTTP/1.1 request and remove the connection from this pool. ChannelOperations ops = ChannelOperations.get(channel); diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index a86dd87f3b..8162e5df18 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -24,9 +24,14 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Function; +import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.SslHandler; +import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Disposable; @@ -36,8 +41,8 @@ import reactor.core.publisher.Operators; import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; -import reactor.netty.ConnectionObserver; -import reactor.netty.channel.ChannelOperations; +import reactor.netty.FutureMono; +import reactor.netty.NettyPipeline; import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; import reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException; import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException; @@ -59,7 +64,6 @@ *
    *
  • The connection is closed.
  • *
  • The connection has reached its life time and there are no active streams.
  • - *
  • The connection has no active streams.
  • *
  • When the client is in one of the two modes: 1) H2 and HTTP/1.1 or 2) H2C and HTTP/1.1, * and the negotiated protocol is HTTP/1.1.
  • *
@@ -75,9 +79,9 @@ *

* This pool always invalidate the {@link PooledRef}, there is no release functionality. *

    - *
  • {@link PoolMetrics#acquiredSize()} and {@link PoolMetrics#allocatedSize()} always return the number of - * the active streams from all connections currently in the pool.
  • - *
  • {@link PoolMetrics#idleSize()} always returns {@code 0}.
  • + *
  • {@link PoolMetrics#acquiredSize()}, {@link PoolMetrics#allocatedSize()} and {@link PoolMetrics#idleSize()} + * always return the number of the cached connections.
  • + *
  • {@link Http2Pool#activeStreams()} always return the active streams from all connections currently in the pool.
  • *
*

* Configurations that are not applicable @@ -114,6 +118,10 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool. static final AtomicReferenceFieldUpdater CONNECTIONS = AtomicReferenceFieldUpdater.newUpdater(Http2Pool.class, ConcurrentLinkedQueue.class, "connections"); + volatile int idleSize; + private static final AtomicIntegerFieldUpdater IDLE_SIZE = + AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "idleSize"); + /** * Pending borrowers queue. Never invoke directly the poll/add/remove methods and instead of that, * use addPending/pollPending/removePending methods which take care of maintaining the pending queue size. @@ -171,12 +179,12 @@ public Mono> acquire(Duration timeout) { @Override public int acquiredSize() { - return acquired; + return allocatedSize() - idleSize(); } @Override public int allocatedSize() { - return acquired; + return poolConfig.allocationStrategy().permitGranted(); } @Override @@ -197,10 +205,19 @@ public Mono disposeLater() { p.fail(new PoolShutdownException()); } - // the last stream on that connection will release the connection to the parent pool - // the structure should not contain connections with 0 streams as the last stream on that connection - // always removes the connection from this pool - CONNECTIONS.getAndSet(this, null); + @SuppressWarnings("unchecked") + ConcurrentLinkedQueue slots = CONNECTIONS.getAndSet(this, null); + if (slots != null) { + Mono closeMonos = Mono.empty(); + while (!slots.isEmpty()) { + Slot slot = pollSlot(slots); + if (slot != null) { + slot.invalidate(); + closeMonos = closeMonos.and(DEFAULT_DESTROY_HANDLER.apply(slot.connection)); + } + } + return closeMonos; + } } return Mono.empty(); }); @@ -218,7 +235,7 @@ public int getMaxPendingAcquireSize() { @Override public int idleSize() { - return 0; + return idleSize; } @Override @@ -253,6 +270,10 @@ public Mono warmup() { return Mono.just(0); } + int activeStreams() { + return acquired; + } + void cancelAcquire(Borrower borrower) { if (!isDisposed()) { ConcurrentLinkedDeque q = pending; @@ -260,15 +281,32 @@ void cancelAcquire(Borrower borrower) { } } + @SuppressWarnings("FutureReturnValueIgnored") Mono destroyPoolable(Http2PooledRef ref) { + assert ref.slot.connection.channel().eventLoop().inEventLoop(); Mono mono = Mono.empty(); try { + // By default, check the connection for removal on acquire and invalidate (only if there are no active streams) if (ref.slot.decrementConcurrencyAndGet() == 0) { - ref.slot.invalidate(); - Connection connection = ref.poolable(); - Http2FrameCodec frameCodec = connection.channel().pipeline().get(Http2FrameCodec.class); - if (frameCodec != null) { - releaseConnection(connection); + // not HTTP/2 request + if (ref.slot.http2FrameCodecCtx() == null) { + ref.slot.invalidate(); + removeSlot(ref.slot); + } + // If there is eviction in background, the background process will remove this connection + else if (poolConfig.evictInBackgroundInterval().isZero()) { + // not active + if (!ref.poolable().channel().isActive()) { + ref.slot.invalidate(); + removeSlot(ref.slot); + } + // max life reached + else if (maxLifeReached(ref.slot)) { + //"FutureReturnValueIgnored" this is deliberate + ref.slot.connection.channel().close(); + ref.slot.invalidate(); + removeSlot(ref.slot); + } } } } @@ -315,27 +353,22 @@ void drainLoop() { if (slot != null) { Borrower borrower = pollPending(borrowers, true); if (borrower == null) { - resources.offer(slot); + offerSlot(resources, slot); continue; } if (isDisposed()) { borrower.fail(new PoolShutdownException()); return; } - if (slot.incrementConcurrencyAndGet() > 1) { - borrower.stopPendingCountdown(); - if (log.isDebugEnabled()) { - log.debug(format(slot.connection.channel(), "Channel activated")); - } - ACQUIRED.incrementAndGet(this); - // we are ready here, the connection can be used for opening another stream - slot.deactivate(); - poolConfig.acquisitionScheduler().schedule(() -> borrower.deliver(new Http2PooledRef(slot))); - } - else { - addPending(borrowers, borrower, true); - continue; + borrower.stopPendingCountdown(); + if (log.isDebugEnabled()) { + log.debug(format(slot.connection.channel(), "Channel activated")); } + ACQUIRED.incrementAndGet(this); + slot.connection.channel().eventLoop().execute(() -> { + borrower.deliver(new Http2PooledRef(slot)); + drain(); + }); } else { int permits = poolConfig.allocationStrategy().getPermits(1); @@ -372,8 +405,6 @@ void drainLoop() { log.debug(format(newInstance.channel(), "Channel activated")); } ACQUIRED.incrementAndGet(this); - newSlot.incrementConcurrencyAndGet(); - newSlot.deactivate(); borrower.deliver(new Http2PooledRef(newSlot)); } else if (sig.isOnError()) { @@ -398,15 +429,16 @@ else if (sig.isOnError()) { } @Nullable + @SuppressWarnings("FutureReturnValueIgnored") Slot findConnection(ConcurrentLinkedQueue resources) { - int resourcesCount = resources.size(); + int resourcesCount = idleSize; while (resourcesCount > 0) { // There are connections in the queue resourcesCount--; // get the connection - Slot slot = resources.poll(); + Slot slot = pollSlot(resources); if (slot == null) { continue; } @@ -418,38 +450,40 @@ Slot findConnection(ConcurrentLinkedQueue resources) { log.debug(format(slot.connection.channel(), "Channel is closed, {} active streams"), slot.concurrency()); } - resources.offer(slot); + offerSlot(resources, slot); } else { if (log.isDebugEnabled()) { log.debug(format(slot.connection.channel(), "Channel is closed, remove from pool")); } - resources.remove(slot); + slot.invalidate(); } continue; } // check that the connection's max lifetime has not been reached - if (maxLifeTime != -1 && slot.lifeTime() >= maxLifeTime) { + if (maxLifeReached(slot)) { if (slot.concurrency() > 0) { if (log.isDebugEnabled()) { log.debug(format(slot.connection.channel(), "Max life time is reached, {} active streams"), slot.concurrency()); } - resources.offer(slot); + offerSlot(resources, slot); } else { if (log.isDebugEnabled()) { log.debug(format(slot.connection.channel(), "Max life time is reached, remove from pool")); } - resources.remove(slot); + //"FutureReturnValueIgnored" this is deliberate + slot.connection.channel().close(); + slot.invalidate(); } continue; } // check that the connection's max active streams has not been reached if (!slot.canOpenStream()) { - resources.offer(slot); + offerSlot(resources, slot); if (log.isDebugEnabled()) { log.debug(format(slot.connection.channel(), "Max active streams is reached")); } @@ -462,6 +496,10 @@ Slot findConnection(ConcurrentLinkedQueue resources) { return null; } + boolean maxLifeReached(Slot slot) { + return maxLifeTime != -1 && slot.lifeTime() >= maxLifeTime; + } + void pendingAcquireLimitReached(Borrower borrower, int maxPending) { if (maxPending == 0) { borrower.fail(new PoolAcquirePendingLimitException(0, @@ -530,33 +568,40 @@ int addPending(ConcurrentLinkedDeque borrowers, Borrower borrower, boo return PENDING_SIZE.incrementAndGet(this); } - static boolean offerSlot(Slot slot) { - @SuppressWarnings("unchecked") - ConcurrentLinkedQueue q = CONNECTIONS.get(slot.pool); - return q != null && q.offer(slot); + void offerSlot(@Nullable ConcurrentLinkedQueue slots, Slot slot) { + if (slots != null && slots.offer(slot)) { + IDLE_SIZE.incrementAndGet(this); + } } - static void releaseConnection(Connection connection) { - ChannelOperations ops = connection.as(ChannelOperations.class); - if (ops != null) { - ops.listener().onStateChange(ops, ConnectionObserver.State.DISCONNECTING); - } - else if (connection instanceof ConnectionObserver) { - ((ConnectionObserver) connection).onStateChange(connection, ConnectionObserver.State.DISCONNECTING); + @Nullable + Slot pollSlot(@Nullable ConcurrentLinkedQueue slots) { + if (slots == null) { + return null; } - else { - connection.dispose(); + Slot slot = slots.poll(); + if (slot != null) { + IDLE_SIZE.decrementAndGet(this); } + return slot; } - static void removeSlot(Slot slot) { + void removeSlot(Slot slot) { @SuppressWarnings("unchecked") ConcurrentLinkedQueue q = CONNECTIONS.get(slot.pool); - if (q != null) { - q.remove(slot); + if (q != null && q.remove(slot)) { + IDLE_SIZE.decrementAndGet(this); } } + static final Function> DEFAULT_DESTROY_HANDLER = + connection -> { + if (!connection.channel().isActive()) { + return Mono.empty(); + } + return FutureMono.from(connection.channel().close()); + }; + static final class Borrower extends AtomicBoolean implements Scannable, Subscription, Runnable { static final Disposable TIMEOUT_DISPOSED = Disposables.disposed(); @@ -589,7 +634,10 @@ Context currentContext() { @Override public void request(long n) { if (Operators.validate(n)) { - if (!acquireTimeout.isZero()) { + // Cannot rely on idleSize because there might be idle connections but not suitable for use + int permits = pool.poolConfig.allocationStrategy().estimatePermitCount(); + int pending = pool.pendingSize; + if (!acquireTimeout.isZero() && permits <= pending) { timeoutTask = Schedulers.parallel().schedule(this, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS); } pool.doAcquire(this); @@ -627,7 +675,9 @@ public String toString() { } void deliver(Http2PooledRef poolSlot) { - stopPendingCountdown(); + assert poolSlot.slot.connection.channel().eventLoop().inEventLoop(); + poolSlot.slot.incrementConcurrencyAndGet(); + poolSlot.slot.deactivate(); if (get()) { //CANCELLED or timeout reached poolSlot.invalidate().subscribe(aVoid -> {}, e -> Operators.onErrorDropped(e, Context.empty())); @@ -737,7 +787,7 @@ public String toString() { } } - static final class Slot { + static final class Slot extends AtomicBoolean { volatile int concurrency; static final AtomicIntegerFieldUpdater CONCURRENCY = @@ -746,18 +796,30 @@ static final class Slot { final Connection connection; final long creationTimestamp; final Http2Pool pool; + final String applicationProtocol; + + volatile ChannelHandlerContext http2FrameCodecCtx; + volatile ChannelHandlerContext http2MultiplexHandlerCtx; + volatile ChannelHandlerContext h2cUpgradeHandlerCtx; Slot(Http2Pool pool, Connection connection) { this.connection = connection; this.creationTimestamp = pool.clock.millis(); this.pool = pool; + SslHandler handler = connection.channel().pipeline().get(SslHandler.class); + if (handler != null) { + this.applicationProtocol = handler.applicationProtocol() != null ? + handler.applicationProtocol() : ApplicationProtocolNames.HTTP_1_1; + } + else { + this.applicationProtocol = null; + } } boolean canOpenStream() { - Http2FrameCodec frameCodec = connection.channel().pipeline().get(Http2FrameCodec.class); - Http2MultiplexHandler multiplexHandler = connection.channel().pipeline().get(Http2MultiplexHandler.class); - if (frameCodec != null && multiplexHandler != null) { - int maxActiveStreams = frameCodec.connection().local().maxActiveStreams(); + ChannelHandlerContext frameCodec = http2FrameCodecCtx(); + if (frameCodec != null && http2MultiplexHandlerCtx() != null) { + int maxActiveStreams = ((Http2FrameCodec) frameCodec.handler()).connection().local().maxActiveStreams(); int concurrency = this.concurrency; return concurrency < maxActiveStreams; } @@ -772,23 +834,59 @@ void deactivate() { if (log.isDebugEnabled()) { log.debug(format(connection.channel(), "Channel deactivated")); } - offerSlot(this); + @SuppressWarnings("unchecked") + ConcurrentLinkedQueue slots = CONNECTIONS.get(pool); + pool.offerSlot(slots, this); } int decrementConcurrencyAndGet() { return CONCURRENCY.decrementAndGet(this); } - int incrementConcurrencyAndGet() { - return CONCURRENCY.incrementAndGet(this); + @Nullable + ChannelHandlerContext http2FrameCodecCtx() { + ChannelHandlerContext ctx = http2FrameCodecCtx; + if (ctx != null && !ctx.isRemoved()) { + return ctx; + } + ctx = connection.channel().pipeline().context(Http2FrameCodec.class); + http2FrameCodecCtx = ctx; + return ctx; + } + + @Nullable + ChannelHandlerContext http2MultiplexHandlerCtx() { + ChannelHandlerContext ctx = http2MultiplexHandlerCtx; + if (ctx != null && !ctx.isRemoved()) { + return ctx; + } + ctx = connection.channel().pipeline().context(Http2MultiplexHandler.class); + http2MultiplexHandlerCtx = ctx; + return ctx; + } + + @Nullable + ChannelHandlerContext h2cUpgradeHandlerCtx() { + ChannelHandlerContext ctx = h2cUpgradeHandlerCtx; + if (ctx != null && !ctx.isRemoved()) { + return ctx; + } + ctx = connection.channel().pipeline().context(NettyPipeline.H2CUpgradeHandler); + h2cUpgradeHandlerCtx = ctx; + return ctx; + } + + void incrementConcurrencyAndGet() { + CONCURRENCY.incrementAndGet(this); } void invalidate() { - if (log.isDebugEnabled()) { - log.debug(format(connection.channel(), "Channel removed from pool")); + if (compareAndSet(false, true)) { + if (log.isDebugEnabled()) { + log.debug(format(connection.channel(), "Channel removed from pool")); + } + pool.poolConfig.allocationStrategy().returnPermits(1); } - pool.poolConfig.allocationStrategy().returnPermits(1); - removeSlot(this); } long lifeTime() { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java index 08bd8ee6af..3543fe1608 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java @@ -22,16 +22,21 @@ import java.net.SocketAddress; +import static reactor.netty.Metrics.ACTIVE_CONNECTIONS; import static reactor.netty.Metrics.ACTIVE_STREAMS; import static reactor.netty.Metrics.CONNECTION_PROVIDER_PREFIX; import static reactor.netty.Metrics.ID; +import static reactor.netty.Metrics.IDLE_CONNECTIONS; import static reactor.netty.Metrics.NAME; import static reactor.netty.Metrics.PENDING_STREAMS; import static reactor.netty.Metrics.REGISTRY; import static reactor.netty.Metrics.REMOTE_ADDRESS; final class MicrometerHttp2ConnectionProviderMeterRegistrar { + static final String ACTIVE_CONNECTIONS_DESCRIPTION = + "The number of the connections that have been successfully acquired and are in active use"; static final String ACTIVE_STREAMS_DESCRIPTION = "The number of the active HTTP/2 streams"; + static final String IDLE_CONNECTIONS_DESCRIPTION = "The number of the idle connections"; static final String PENDING_STREAMS_DESCRIPTION = "The number of requests that are waiting for opening HTTP/2 stream"; @@ -45,11 +50,21 @@ void registerMetrics(String poolName, String id, SocketAddress remoteAddress, In String addressAsString = Metrics.formatSocketAddress(remoteAddress); Tags tags = Tags.of(ID, id, REMOTE_ADDRESS, addressAsString, NAME, poolName); - Gauge.builder(CONNECTION_PROVIDER_PREFIX + ACTIVE_STREAMS, metrics, InstrumentedPool.PoolMetrics::acquiredSize) + Gauge.builder(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, metrics, InstrumentedPool.PoolMetrics::acquiredSize) + .description(ACTIVE_CONNECTIONS_DESCRIPTION) + .tags(tags) + .register(REGISTRY); + + Gauge.builder(CONNECTION_PROVIDER_PREFIX + ACTIVE_STREAMS, metrics, poolMetrics -> ((Http2Pool) poolMetrics).activeStreams()) .description(ACTIVE_STREAMS_DESCRIPTION) .tags(tags) .register(REGISTRY); + Gauge.builder(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, metrics, InstrumentedPool.PoolMetrics::idleSize) + .description(IDLE_CONNECTIONS_DESCRIPTION) + .tags(tags) + .register(REGISTRY); + Gauge.builder(CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, metrics, InstrumentedPool.PoolMetrics::pendingAcquireSize) .description(PENDING_STREAMS_DESCRIPTION) .tags(tags) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index dc24581f27..ec68ef3669 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -24,7 +24,6 @@ import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; import reactor.netty.Connection; -import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException; import reactor.netty.internal.shaded.reactor.pool.PoolBuilder; import reactor.netty.internal.shaded.reactor.pool.PoolConfig; @@ -38,6 +37,7 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -53,7 +53,7 @@ void acquireInvalidate() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); try { List> acquired = new ArrayList<>(); @@ -61,21 +61,23 @@ void acquireInvalidate() { http2Pool.acquire().subscribe(acquired::add); http2Pool.acquire().subscribe(acquired::add); + channel.runPendingTasks(); + assertThat(acquired).hasSize(3); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(3); + assertThat(http2Pool.activeStreams()).isEqualTo(3); for (PooledRef slot : acquired) { slot.invalidate().block(Duration.ofSeconds(1)); } - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); for (PooledRef slot : acquired) { // second invalidate() should be ignored and ACQUIRED size should remain the same slot.invalidate().block(Duration.ofSeconds(1)); } - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); } finally { channel.finishAndReleaseAll(); @@ -92,7 +94,7 @@ void acquireRelease() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); try { List> acquired = new ArrayList<>(); @@ -100,21 +102,23 @@ void acquireRelease() { http2Pool.acquire().subscribe(acquired::add); http2Pool.acquire().subscribe(acquired::add); + channel.runPendingTasks(); + assertThat(acquired).hasSize(3); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(3); + assertThat(http2Pool.activeStreams()).isEqualTo(3); for (PooledRef slot : acquired) { slot.release().block(Duration.ofSeconds(1)); } - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); for (PooledRef slot : acquired) { // second release() should be ignored and ACQUIRED size should remain the same slot.release().block(Duration.ofSeconds(1)); } - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); } finally { channel.finishAndReleaseAll(); @@ -141,7 +145,7 @@ void evictClosedConnection() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection = acquired1.poolable(); @@ -153,18 +157,18 @@ void evictClosedConnection() throws Exception { assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); acquired1.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection = acquired2.poolable(); @@ -174,8 +178,8 @@ void evictClosedConnection() throws Exception { acquired2.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); - assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); } finally { if (connection != null) { @@ -186,12 +190,22 @@ void evictClosedConnection() throws Exception { } @Test - void evictClosedConnectionMaxConnectionsNotReached() throws Exception { + void evictClosedConnectionMaxConnectionsNotReached_1() throws Exception { + evictClosedConnectionMaxConnectionsNotReached(false); + } + + @Test + void evictClosedConnectionMaxConnectionsNotReached_2() throws Exception { + evictClosedConnectionMaxConnectionsNotReached(true); + } + + private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) throws Exception { PoolBuilder> poolBuilder = PoolBuilder.from(Mono.fromSupplier(() -> { Channel channel = new EmbeddedChannel( new TestChannelId(), - Http2FrameCodecBuilder.forClient().build()); + Http2FrameCodecBuilder.forClient().build(), + new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); return Connection.from(channel); })) .idleResourceReuseLruOrder() @@ -204,7 +218,7 @@ void evictClosedConnectionMaxConnectionsNotReached() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection = acquired1.poolable(); @@ -216,25 +230,48 @@ void evictClosedConnectionMaxConnectionsNotReached() throws Exception { assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); PooledRef acquired2 = http2Pool.acquire().block(); - assertThat(acquired2).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(2); - assertThat(http2Pool.connections.size()).isEqualTo(2); + + AtomicReference> acquired3 = new AtomicReference<>(); + http2Pool.acquire().subscribe(acquired3::set); connection = acquired2.poolable(); - ChannelId id2 = connection.channel().id(); + ((EmbeddedChannel) connection.channel()).runPendingTasks(); + + assertThat(http2Pool.activeStreams()).isEqualTo(3); + assertThat(http2Pool.connections.size()).isEqualTo(2); + + if (closeSecond) { + latch = new CountDownLatch(1); + ((EmbeddedChannel) connection.channel()).finishAndReleaseAll(); + connection.onDispose(latch::countDown); + connection.dispose(); + assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue(); + } + + ChannelId id2 = connection.channel().id(); assertThat(id1).isNotEqualTo(id2); acquired1.invalidate().block(); acquired2.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); - assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + + acquired3.get().invalidate().block(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + if (closeSecond) { + assertThat(http2Pool.connections.size()).isEqualTo(0); + } + else { + assertThat(http2Pool.connections.size()).isEqualTo(1); + } } finally { if (connection != null) { @@ -263,7 +300,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection = acquired1.poolable(); @@ -274,7 +311,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); http2Pool.acquire(Duration.ofMillis(10)) @@ -282,12 +319,12 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { .expectError(PoolAcquireTimeoutException.class) .verify(Duration.ofSeconds(1)); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); acquired1.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); } finally { @@ -318,7 +355,7 @@ void maxLifeTime() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection1 = acquired1.poolable(); @@ -326,18 +363,18 @@ void maxLifeTime() throws Exception { Thread.sleep(10); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); acquired1.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection2 = acquired2.poolable(); @@ -347,8 +384,8 @@ void maxLifeTime() throws Exception { acquired2.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); - assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); } finally { if (connection1 != null) { @@ -374,7 +411,7 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 50)); Connection connection1 = null; Connection connection2 = null; @@ -382,21 +419,21 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection1 = acquired1.poolable(); ChannelId id1 = connection1.channel().id(); - Thread.sleep(10); + Thread.sleep(50); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(2); + assertThat(http2Pool.activeStreams()).isEqualTo(2); assertThat(http2Pool.connections.size()).isEqualTo(2); connection2 = acquired2.poolable(); @@ -407,8 +444,8 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { acquired1.invalidate().block(); acquired2.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); - assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); } finally { if (connection1 != null) { @@ -441,14 +478,14 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection = acquired1.poolable(); Thread.sleep(10); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); http2Pool.acquire(Duration.ofMillis(10)) @@ -456,12 +493,12 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { .expectError(PoolAcquireTimeoutException.class) .verify(Duration.ofSeconds(1)); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); acquired1.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); } finally { @@ -488,24 +525,25 @@ void nonHttp2ConnectionEmittedOnce() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); try { PooledRef acquired = http2Pool.acquire().block(Duration.ofSeconds(1)); assertThat(acquired).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); http2Pool.acquire(Duration.ofMillis(10)) .as(StepVerifier::create) .expectError(PoolAcquireTimeoutException.class) .verify(Duration.ofSeconds(1)); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); acquired.invalidate().block(Duration.ofSeconds(1)); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(0); } finally { channel.finishAndReleaseAll(); diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java index 6ee8a43053..32453a4c24 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java @@ -15,6 +15,10 @@ */ package reactor.netty.resources; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; @@ -28,7 +32,9 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.SelfSignedCertificate; import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -62,18 +68,41 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; +import static reactor.netty.Metrics.ACTIVE_CONNECTIONS; +import static reactor.netty.Metrics.CONNECTION_PROVIDER_PREFIX; +import static reactor.netty.Metrics.IDLE_CONNECTIONS; +import static reactor.netty.Metrics.NAME; +import static reactor.netty.Metrics.REMOTE_ADDRESS; +import static reactor.netty.Metrics.TOTAL_CONNECTIONS; +import static reactor.netty.http.client.HttpClientState.STREAM_CONFIGURED; class DefaultPooledConnectionProviderTest extends BaseHttpTest { static SelfSignedCertificate ssc; + private MeterRegistry registry; + @BeforeAll static void createSelfSignedCertificate() throws CertificateException { ssc = new SelfSignedCertificate(); } + @BeforeEach + void setUp() { + registry = new SimpleMeterRegistry(); + Metrics.addRegistry(registry); + } + + @AfterEach + void tearDown() { + Metrics.removeRegistry(registry); + registry.clear(); + registry.close(); + } + @Test void testIssue903() { Http11SslContextSpec serverCtx = Http11SslContextSpec.forServer(ssc.key(), ssc.cert()); @@ -290,7 +319,7 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, } @Test - void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception { + void testConnectionIdleWhenNoActiveStreams() throws Exception { Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()); Http2SslContextSpec clientCtx = Http2SslContextSpec.forClient() @@ -307,19 +336,31 @@ void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception { int requestsNum = 10; CountDownLatch latch = new CountDownLatch(1); DefaultPooledConnectionProvider provider = - (DefaultPooledConnectionProvider) ConnectionProvider.create("testConnectionReturnedToParentPoolWhenNoActiveStreams", 5); + (DefaultPooledConnectionProvider) ConnectionProvider.create("testConnectionIdleWhenNoActiveStreams", 5); AtomicInteger counter = new AtomicInteger(); + AtomicReference serverAddress = new AtomicReference<>(); HttpClient client = createClient(provider, disposableServer.port()) - .wiretap(false) + .wiretap(false) .protocol(HttpProtocol.H2) .secure(spec -> spec.sslContext(clientCtx)) + .metrics(true, Function.identity()) + .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())) .observe((conn, state) -> { - if (state == ConnectionObserver.State.CONNECTED) { + if (state == STREAM_CONFIGURED) { counter.incrementAndGet(); - } - if (state == ConnectionObserver.State.RELEASED && counter.decrementAndGet() == 0) { - latch.countDown(); + conn.onTerminate() + .subscribe(null, + t -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + }), + () -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + })); } }); @@ -328,7 +369,7 @@ void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception { .flatMap(i -> client.post() .uri("/") - .send(ByteBufMono.fromString(Mono.just("testConnectionReturnedToParentPoolWhenNoActiveStreams"))) + .send(ByteBufMono.fromString(Mono.just("testConnectionIdleWhenNoActiveStreams"))) .responseContent() .aggregate() .asString()) @@ -336,14 +377,16 @@ void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception { assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); - assertThat(provider.channelPools).hasSize(1); + InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); + String address = sa.getHostString() + ":" + sa.getPort(); - @SuppressWarnings({"unchecked", "rawtypes"}) - InstrumentedPool channelPool = - provider.channelPools.values().toArray(new InstrumentedPool[0])[0]; - InstrumentedPool.PoolMetrics metrics = channelPool.metrics(); - assertThat(metrics.acquiredSize()).isEqualTo(0); - assertThat(metrics.allocatedSize()).isEqualTo(metrics.idleSize()); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.testConnectionIdleWhenNoActiveStreams")).isEqualTo(0); + double idleConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.testConnectionIdleWhenNoActiveStreams"); + double totalConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "testConnectionIdleWhenNoActiveStreams"); + assertThat(totalConn).isEqualTo(idleConn); } finally { provider.disposeLater() @@ -442,21 +485,33 @@ private void doTestIssue1982(HttpProtocol[] serverProtocols, HttpProtocol[] clie .bindNow(); DefaultPooledConnectionProvider provider = - (DefaultPooledConnectionProvider) ConnectionProvider.create("", 5); + (DefaultPooledConnectionProvider) ConnectionProvider.create("doTestIssue1982", 5); CountDownLatch latch = new CountDownLatch(1); AtomicInteger counter = new AtomicInteger(); + AtomicReference serverAddress = new AtomicReference<>(); HttpClient mainClient = clientCtx != null ? HttpClient.create(provider).port(disposableServer.port()).secure(sslContextSpec -> sslContextSpec.sslContext(clientCtx)) : HttpClient.create(provider).port(disposableServer.port()); HttpClient client = mainClient.protocol(clientProtocols) + .metrics(true, Function.identity()) + .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())) .observe((conn, state) -> { - if (state == ConnectionObserver.State.CONNECTED) { + if (state == STREAM_CONFIGURED) { counter.incrementAndGet(); - } - if (state == ConnectionObserver.State.RELEASED && counter.decrementAndGet() == 0) { - latch.countDown(); + conn.onTerminate() + .subscribe(null, + t -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + }), + () -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + })); } }); try { @@ -471,12 +526,16 @@ private void doTestIssue1982(HttpProtocol[] serverProtocols, HttpProtocol[] clie assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); - @SuppressWarnings({"unchecked", "rawtypes"}) - InstrumentedPool channelPool = - provider.channelPools.values().toArray(new InstrumentedPool[0])[0]; - InstrumentedPool.PoolMetrics metrics = channelPool.metrics(); - assertThat(metrics.acquiredSize()).isEqualTo(0); - assertThat(metrics.allocatedSize()).isEqualTo(metrics.idleSize()); + InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); + String address = sa.getHostString() + ":" + sa.getPort(); + + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.doTestIssue1982")).isEqualTo(0); + double idleConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.doTestIssue1982"); + double totalConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "doTestIssue1982"); + assertThat(totalConn).isEqualTo(idleConn); } finally { provider.disposeLater() @@ -512,4 +571,13 @@ public boolean trySuccess(Void result) { return r; } } + + private double getGaugeValue(String gaugeName, String... tags) { + Gauge gauge = registry.find(gaugeName).tags(tags).gauge(); + double result = -1; + if (gauge != null) { + result = gauge.value(); + } + return result; + } } diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java index ac752e082f..a466ff3982 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java @@ -207,15 +207,17 @@ private void doTest(HttpServer server, HttpClient client, String poolName, boole assertThat(metrics.get()).isTrue(); if (isSecured) { assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, poolName)).isEqualTo(1); - assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, poolName)).isEqualTo(1); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, poolName)).isEqualTo(1); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, "http2." + poolName)).isEqualTo(0); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, "http2." + poolName)).isEqualTo(1); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_STREAMS, "http2." + poolName)).isEqualTo(0); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, "http2." + poolName)).isEqualTo(0); } else { assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, poolName)).isEqualTo(0); - assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, poolName)).isEqualTo(0); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, poolName)).isEqualTo(0); } - assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, poolName)).isEqualTo(0); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, poolName)).isEqualTo(0); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + PENDING_CONNECTIONS, poolName)).isEqualTo(0); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + MAX_CONNECTIONS, poolName)).isEqualTo(expectedMaxConnection); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + MAX_PENDING_CONNECTIONS, poolName)).isEqualTo(expectedMaxPendingAcquire);