diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java index 25947313e2..0823c7562a 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java @@ -455,6 +455,10 @@ PoolBuilder> newPoolInternal( return poolBuilder; } + public long maxIdleTime() { + return this.maxIdleTime; + } + public long maxLifeTime() { return maxLifeTime; } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index f8cb3eb3b4..69f3f5ca7e 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -466,7 +466,7 @@ static final class PooledConnectionAllocator { this.remoteAddress = remoteAddress; this.resolver = resolver; this.pool = poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, - poolConFig -> new Http2Pool(poolConFig, poolFactory.maxLifeTime())); + poolConFig -> new Http2Pool(poolConFig, poolFactory.maxIdleTime(), poolFactory.maxLifeTime())); } Publisher connectChannel() { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index 8162e5df18..5261ba7226 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -64,6 +64,7 @@ * @@ -148,18 +149,20 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool. AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "wip"); final Clock clock; + final long maxIdleTime; final long maxLifeTime; final PoolConfig poolConfig; long lastInteractionTimestamp; - Http2Pool(PoolConfig poolConfig, long maxLifeTime) { + Http2Pool(PoolConfig poolConfig, long maxIdleTime, long maxLifeTime) { if (poolConfig.allocationStrategy().getPermits(0) != 0) { throw new IllegalArgumentException("No support for configuring minimum number of connections"); } this.clock = poolConfig.clock(); this.connections = new ConcurrentLinkedQueue<>(); this.lastInteractionTimestamp = clock.millis(); + this.maxIdleTime = maxIdleTime; this.maxLifeTime = maxLifeTime; this.pending = new ConcurrentLinkedDeque<>(); this.poolConfig = poolConfig; @@ -461,7 +464,18 @@ Slot findConnection(ConcurrentLinkedQueue resources) { continue; } - // check that the connection's max lifetime has not been reached + // check whether the connection's idle time has been reached + if (maxIdleTime != -1 && slot.idleTime() >= maxIdleTime) { + if (log.isDebugEnabled()) { + log.debug(format(slot.connection.channel(), "Idle time is reached, remove from pool")); + } + //"FutureReturnValueIgnored" this is deliberate + slot.connection.channel().close(); + slot.invalidate(); + continue; + } + + // check whether the connection's max lifetime has been reached if (maxLifeReached(slot)) { if (slot.concurrency() > 0) { if (log.isDebugEnabled()) { @@ -798,6 +812,8 @@ static final class Slot extends AtomicBoolean { final Http2Pool pool; final String applicationProtocol; + long idleTimestamp; + volatile ChannelHandlerContext http2FrameCodecCtx; volatile ChannelHandlerContext http2MultiplexHandlerCtx; volatile ChannelHandlerContext h2cUpgradeHandlerCtx; @@ -840,7 +856,17 @@ void deactivate() { } int decrementConcurrencyAndGet() { - return CONCURRENCY.decrementAndGet(this); + int concurrency = CONCURRENCY.decrementAndGet(this); + idleTimestamp = pool.clock.millis(); + return concurrency; + } + + long idleTime() { + if (concurrency() > 0) { + return 0L; + } + long idleTime = idleTimestamp != 0 ? idleTimestamp : creationTimestamp; + return pool.clock.millis() - idleTime; } @Nullable diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index ec68ef3669..bc1ec1a561 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -53,7 +53,7 @@ void acquireInvalidate() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); try { List> acquired = new ArrayList<>(); @@ -94,7 +94,7 @@ void acquireRelease() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); try { List> acquired = new ArrayList<>(); @@ -138,7 +138,7 @@ void evictClosedConnection() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); Connection connection = null; try { @@ -211,7 +211,7 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); Connection connection = null; try { @@ -293,7 +293,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); Connection connection = null; try { @@ -335,6 +335,120 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { } } + @Test + void maxIdleTime() throws Exception { + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.fromSupplier(() -> { + Channel channel = new EmbeddedChannel( + new TestChannelId(), + Http2FrameCodecBuilder.forClient().build()); + return Connection.from(channel); + })) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(0, 1); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10, -1)); + + Connection connection1 = null; + Connection connection2 = null; + try { + PooledRef acquired1 = http2Pool.acquire().block(); + + assertThat(acquired1).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + + connection1 = acquired1.poolable(); + ChannelId id1 = connection1.channel().id(); + + acquired1.invalidate().block(); + + Thread.sleep(15); + + PooledRef acquired2 = http2Pool.acquire().block(); + + assertThat(acquired2).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + + connection2 = acquired2.poolable(); + ChannelId id2 = connection2.channel().id(); + + assertThat(id1).isNotEqualTo(id2); + + acquired2.invalidate().block(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); + } + finally { + if (connection1 != null) { + ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll(); + connection1.dispose(); + } + if (connection2 != null) { + ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll(); + connection2.dispose(); + } + } + } + + @Test + void maxIdleTimeActiveStreams() throws Exception { + EmbeddedChannel channel = new EmbeddedChannel(new TestChannelId(), + Http2FrameCodecBuilder.forClient().build(), new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.just(Connection.from(channel))) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(0, 1); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10, -1)); + + Connection connection1 = null; + Connection connection2 = null; + try { + List> acquired = new ArrayList<>(); + http2Pool.acquire().subscribe(acquired::add); + http2Pool.acquire().subscribe(acquired::add); + + channel.runPendingTasks(); + + assertThat(acquired).hasSize(2); + assertThat(http2Pool.activeStreams()).isEqualTo(2); + assertThat(http2Pool.connections.size()).isEqualTo(1); + + connection1 = acquired.get(0).poolable(); + ChannelId id1 = connection1.channel().id(); + + acquired.get(0).invalidate().block(); + + Thread.sleep(15); + + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + + connection2 = acquired.get(1).poolable(); + ChannelId id2 = connection2.channel().id(); + + assertThat(id1).isEqualTo(id2); + + acquired.get(1).invalidate().block(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); + } + finally { + if (connection1 != null) { + ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll(); + connection1.dispose(); + } + if (connection2 != null) { + ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll(); + connection2.dispose(); + } + } + } + @Test void maxLifeTime() throws Exception { PoolBuilder> poolBuilder = @@ -347,7 +461,7 @@ void maxLifeTime() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 10)); Connection connection1 = null; Connection connection2 = null; @@ -411,7 +525,7 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 50)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 50)); Connection connection1 = null; Connection connection2 = null; @@ -471,7 +585,7 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 10)); Connection connection = null; try { @@ -514,7 +628,7 @@ void minConnectionsConfigNotSupported() { PoolBuilder> poolBuilder = PoolBuilder.from(Mono.empty()).sizeBetween(1, 2); assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> poolBuilder.build(config -> new Http2Pool(config, -1))); + .isThrownBy(() -> poolBuilder.build(config -> new Http2Pool(config, -1, -1))); } @Test @@ -525,7 +639,7 @@ void nonHttp2ConnectionEmittedOnce() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); try { PooledRef acquired = http2Pool.acquire().block(Duration.ofSeconds(1));