Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #922 Add ability to configure maxLifeTime for connection pools #923

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 13 additions & 9 deletions src/main/java/reactor/netty/resources/ConnectionProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ static ConnectionProvider newConnection() {
* {@link Connection}
*/
static ConnectionProvider elastic(String name) {
return elastic(name, null);
return elastic(name, null, null);
}

/**
Expand All @@ -91,20 +91,22 @@ static ConnectionProvider elastic(String name) {
* of clients using it.
*
* @param name the channel pool map name
* @param maxIdleTime the {@link Duration} after which the channel will be closed (resolution: ms),
* @param maxIdleTime the {@link Duration} after which the channel will be closed when idle (resolution: ms),
* if {@code NULL} there is no max idle time
* @param maxLifeTime the {@link Duration} after which the channel will be closed (resolution: ms),
* if {@code NULL} there is no max life time
*
* @return a new {@link ConnectionProvider} to cache and grow on demand
* {@link Connection}
*/
static ConnectionProvider elastic(String name, @Nullable Duration maxIdleTime) {
static ConnectionProvider elastic(String name, @Nullable Duration maxIdleTime, @Nullable Duration maxLifeTime) {
return new PooledConnectionProvider(name,
(allocator, destroyHandler, evictionPredicate) ->
PoolBuilder.from(allocator)
.destroyHandler(destroyHandler)
.evictionPredicate(evictionPredicate
.or((poolable, meta) -> maxIdleTime != null &&
meta.idleTime() >= maxIdleTime.toMillis()))
.or((poolable, meta) -> (maxIdleTime != null && meta.idleTime() >= maxIdleTime.toMillis())
|| (maxLifeTime != null && meta.lifeTime() >= maxLifeTime.toMillis())))
.fifo());
}

Expand Down Expand Up @@ -156,7 +158,7 @@ static ConnectionProvider fixed(String name, int maxConnections) {
* number of {@link Connection}
*/
static ConnectionProvider fixed(String name, int maxConnections, long acquireTimeout) {
return fixed(name, maxConnections, acquireTimeout, null);
return fixed(name, maxConnections, acquireTimeout, null, null);
}

/**
Expand All @@ -171,11 +173,13 @@ static ConnectionProvider fixed(String name, int maxConnections, long acquireTim
* must complete or the {@link TimeoutException} will be thrown.
* @param maxIdleTime the {@link Duration} after which the channel will be closed (resolution: ms),
* if {@code NULL} there is no max idle time
* @param maxLifeTime the {@link Duration} after which the channel will be closed (resolution: ms),
* if {@code NULL} there is no max life time
*
* @return a new {@link ConnectionProvider} to cache and reuse a fixed maximum
* number of {@link Connection}
*/
static ConnectionProvider fixed(String name, int maxConnections, long acquireTimeout, @Nullable Duration maxIdleTime) {
static ConnectionProvider fixed(String name, int maxConnections, long acquireTimeout, @Nullable Duration maxIdleTime, @Nullable Duration maxLifeTime) {
if (maxConnections == -1) {
return elastic(name);
}
Expand All @@ -192,8 +196,8 @@ static ConnectionProvider fixed(String name, int maxConnections, long acquireTim
.maxPendingAcquireUnbounded()
.destroyHandler(destroyHandler)
.evictionPredicate(evictionPredicate
.or((poolable, meta) -> maxIdleTime != null &&
meta.idleTime() >= maxIdleTime.toMillis()))
.or((poolable, meta) -> (maxIdleTime != null && meta.idleTime() >= maxIdleTime.toMillis())
|| (maxLifeTime != null && meta.lifeTime() >= maxLifeTime.toMillis())))
.fifo(),
acquireTimeout,
maxConnections);
Expand Down
59 changes: 57 additions & 2 deletions src/test/java/reactor/netty/http/client/HttpClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1841,14 +1841,14 @@ private void doTestIssue777_2(HttpClient client, String uri, String expectation,

@Test
public void testConnectionIdleTimeFixedPool() throws Exception {
ConnectionProvider provider = ConnectionProvider.fixed("test", 1, 100, Duration.ofMillis(10));
ConnectionProvider provider = ConnectionProvider.fixed("test", 1, 100, Duration.ofMillis(10), null);
ChannelId[] ids = doTestConnectionIdleTime(provider);
assertThat(ids[0]).isNotEqualTo(ids[1]);
}

@Test
public void testConnectionIdleTimeElasticPool() throws Exception {
ConnectionProvider provider = ConnectionProvider.elastic("test", Duration.ofMillis(10));
ConnectionProvider provider = ConnectionProvider.elastic("test", Duration.ofMillis(10), null);
ChannelId[] ids = doTestConnectionIdleTime(provider);
assertThat(ids[0]).isNotEqualTo(ids[1]);
}
Expand Down Expand Up @@ -1893,6 +1893,61 @@ private ChannelId[] doTestConnectionIdleTime(ConnectionProvider provider) throws
return new ChannelId[] {id1, id2};
}

@Test
public void testConnectionLifeTimeFixedPool() throws Exception {
ConnectionProvider provider = ConnectionProvider.fixed("test", 1, 100, null, Duration.ofMillis(30));
ChannelId[] ids = doTestConnectionLifeTime(provider);
assertThat(ids[0]).isNotEqualTo(ids[1]);
}

@Test
public void testConnectionLifeTimeElasticPool() throws Exception {
ConnectionProvider provider = ConnectionProvider.elastic("test", null, Duration.ofMillis(30));
ChannelId[] ids = doTestConnectionLifeTime(provider);
assertThat(ids[0]).isNotEqualTo(ids[1]);
}

@Test
public void testConnectionNoLifeTimeFixedPool() throws Exception {
ConnectionProvider provider = ConnectionProvider.fixed("test", 1, 100);
ChannelId[] ids = doTestConnectionLifeTime(provider);
assertThat(ids[0]).isEqualTo(ids[1]);
}

@Test
public void testConnectionNoLifeTimeElasticPool() throws Exception {
ConnectionProvider provider = ConnectionProvider.elastic("test");
ChannelId[] ids = doTestConnectionLifeTime(provider);
assertThat(ids[0]).isEqualTo(ids[1]);
}

private ChannelId[] doTestConnectionLifeTime(ConnectionProvider provider) throws Exception {
DisposableServer server =
HttpServer.create()
.port(0)
.handle((req, resp) -> resp.sendObject(ByteBufFlux.fromString(Mono.delay(Duration.ofMillis(30)).map(Objects::toString))))
.wiretap(true)
.bindNow();

Flux<ChannelId> id = createHttpClientForContextWithAddress(server, provider)
.get()
.uri("/")
.responseConnection((res, conn) -> Mono.just(conn.channel().id())
.delayUntil(ch -> conn.inbound().receive()));

ChannelId id1 = id.blockLast(Duration.ofSeconds(30));
Thread.sleep(10);
ChannelId id2 = id.blockLast(Duration.ofSeconds(30));

assertThat(id1).isNotNull();
assertThat(id2).isNotNull();

server.disposeNow();
provider.dispose();
return new ChannelId[] {id1, id2};
}


@Test
public void testResourceUrlSetInResponse() {
DisposableServer server =
Expand Down