Skip to content

Commit

Permalink
Add evictInBackground to Http2Pool (#2257)
Browse files Browse the repository at this point in the history
Related to #2151 and #2262
  • Loading branch information
violetagg committed Jun 8, 2022
1 parent d9ae62c commit dae5de8
Show file tree
Hide file tree
Showing 2 changed files with 291 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.time.Clock;
import java.time.Duration;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -88,9 +89,6 @@
* Configurations that are not applicable
* <ul>
* <li>{@link PoolConfig#destroyHandler()} - the destroy handler cannot be used as the destruction is more complex.</li>
* <li>{@link PoolConfig#evictInBackgroundInterval()} and {@link PoolConfig#evictInBackgroundScheduler()} -
* there are no idle resources in the pool. Once the connection does not have active streams, it
* is returned to the parent pool.</li>
* <li>{@link PoolConfig#evictionPredicate()} - the eviction predicate cannot be used as more complex
* checks have to be done. Also the pool uses filtering for the connections (a connection might not be able
* to be used but is required to stay in the pool).</li>
Expand Down Expand Up @@ -155,6 +153,8 @@ final class Http2Pool implements InstrumentedPool<Connection>, InstrumentedPool.

long lastInteractionTimestamp;

Disposable evictionTask;

Http2Pool(PoolConfig<Connection> poolConfig, long maxIdleTime, long maxLifeTime) {
if (poolConfig.allocationStrategy().getPermits(0) != 0) {
throw new IllegalArgumentException("No support for configuring minimum number of connections");
Expand All @@ -168,6 +168,8 @@ final class Http2Pool implements InstrumentedPool<Connection>, InstrumentedPool.
this.poolConfig = poolConfig;

recordInteractionTimestamp();

scheduleEviction();
}

@Override
Expand Down Expand Up @@ -203,6 +205,8 @@ public Mono<Void> disposeLater() {
@SuppressWarnings("unchecked")
ConcurrentLinkedDeque<Borrower> q = PENDING.getAndSet(this, TERMINATED);
if (q != TERMINATED) {
evictionTask.dispose();

Borrower p;
while ((p = pollPending(q, true)) != null) {
p.fail(new PoolShutdownException());
Expand Down Expand Up @@ -431,6 +435,67 @@ else if (sig.isOnError()) {
}
}

@SuppressWarnings("FutureReturnValueIgnored")
void evictInBackground() {
@SuppressWarnings("unchecked")
ConcurrentLinkedQueue<Slot> resources = CONNECTIONS.get(this);
if (resources == null) {
//no need to schedule the task again, pool has been disposed
return;
}

if (WIP.getAndIncrement(this) == 0) {
if (pendingSize == 0) {
Iterator<Slot> slots = resources.iterator();
while (slots.hasNext()) {
Slot slot = slots.next();
if (slot.concurrency() == 0) {
if (!slot.connection.channel().isActive()) {
if (log.isDebugEnabled()) {
log.debug(format(slot.connection.channel(), "Channel is closed, remove from pool"));
}
recordInteractionTimestamp();
slots.remove();
IDLE_SIZE.decrementAndGet(this);
slot.invalidate();
continue;
}

if (maxLifeReached(slot)) {
if (log.isDebugEnabled()) {
log.debug(format(slot.connection.channel(), "Max life time is reached, remove from pool"));
}
//"FutureReturnValueIgnored" this is deliberate
slot.connection.channel().close();
recordInteractionTimestamp();
slots.remove();
IDLE_SIZE.decrementAndGet(this);
slot.invalidate();
continue;
}
}
if (maxIdleReached(slot)) {
if (log.isDebugEnabled()) {
log.debug(format(slot.connection.channel(), "Idle time is reached, remove from pool"));
}
//"FutureReturnValueIgnored" this is deliberate
slot.connection.channel().close();
recordInteractionTimestamp();
slots.remove();
IDLE_SIZE.decrementAndGet(this);
slot.invalidate();
}
}
}
//at the end if there are racing drain calls, go into the drainLoop
if (WIP.decrementAndGet(this) > 0) {
drainLoop();
}
}
//schedule the next iteration
scheduleEviction();
}

@Nullable
@SuppressWarnings("FutureReturnValueIgnored")
Slot findConnection(ConcurrentLinkedQueue<Slot> resources) {
Expand Down Expand Up @@ -465,7 +530,7 @@ Slot findConnection(ConcurrentLinkedQueue<Slot> resources) {
}

// check whether the connection's idle time has been reached
if (maxIdleTime != -1 && slot.idleTime() >= maxIdleTime) {
if (maxIdleReached(slot)) {
if (log.isDebugEnabled()) {
log.debug(format(slot.connection.channel(), "Idle time is reached, remove from pool"));
}
Expand Down Expand Up @@ -510,6 +575,10 @@ Slot findConnection(ConcurrentLinkedQueue<Slot> resources) {
return null;
}

boolean maxIdleReached(Slot slot) {
return maxIdleTime != -1 && slot.idleTime() >= maxIdleTime;
}

boolean maxLifeReached(Slot slot) {
return maxLifeTime != -1 && slot.lifeTime() >= maxLifeTime;
}
Expand Down Expand Up @@ -608,6 +677,17 @@ void removeSlot(Slot slot) {
}
}

void scheduleEviction() {
if (!poolConfig.evictInBackgroundInterval().isZero()) {
long nanosEvictionInterval = poolConfig.evictInBackgroundInterval().toNanos();
this.evictionTask = poolConfig.evictInBackgroundScheduler()
.schedule(this::evictInBackground, nanosEvictionInterval, TimeUnit.NANOSECONDS);
}
else {
this.evictionTask = Disposables.disposed();
}
}

static final Function<Connection, Publisher<Void>> DEFAULT_DESTROY_HANDLER =
connection -> {
if (!connection.channel().isActive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,213 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception {
}
}

@Test
void evictInBackgroundClosedConnection() throws Exception {
PoolBuilder<Connection, PoolConfig<Connection>> poolBuilder =
PoolBuilder.from(Mono.fromSupplier(() -> {
Channel channel = new EmbeddedChannel(
new TestChannelId(),
Http2FrameCodecBuilder.forClient().build());
return Connection.from(channel);
}))
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1)
.evictInBackground(Duration.ofSeconds(5));
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1));

Connection connection = null;
try {
PooledRef<Connection> acquired1 = http2Pool.acquire().block();

assertThat(acquired1).isNotNull();
assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);

connection = acquired1.poolable();
ChannelId id1 = connection.channel().id();
CountDownLatch latch = new CountDownLatch(1);
((EmbeddedChannel) connection.channel()).finishAndReleaseAll();
connection.onDispose(latch::countDown);
connection.dispose();

assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue();

assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);

acquired1.invalidate().block();

http2Pool.evictInBackground();

assertThat(http2Pool.activeStreams()).isEqualTo(0);
assertThat(http2Pool.connections.size()).isEqualTo(0);

PooledRef<Connection> acquired2 = http2Pool.acquire().block();

assertThat(acquired2).isNotNull();
assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);

connection = acquired2.poolable();
ChannelId id2 = connection.channel().id();

assertThat(id1).isNotEqualTo(id2);

acquired2.invalidate().block();

http2Pool.evictInBackground();

assertThat(http2Pool.activeStreams()).isEqualTo(0);
assertThat(http2Pool.connections.size()).isEqualTo(1);
}
finally {
if (connection != null) {
((EmbeddedChannel) connection.channel()).finishAndReleaseAll();
connection.dispose();
}
}
}

@Test
void evictInBackgroundMaxIdleTime() throws Exception {
PoolBuilder<Connection, PoolConfig<Connection>> poolBuilder =
PoolBuilder.from(Mono.fromSupplier(() -> {
Channel channel = new EmbeddedChannel(
new TestChannelId(),
Http2FrameCodecBuilder.forClient().build());
return Connection.from(channel);
}))
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1)
.evictInBackground(Duration.ofSeconds(5));
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, 10, -1));

Connection connection1 = null;
Connection connection2 = null;
try {
PooledRef<Connection> acquired1 = http2Pool.acquire().block();

assertThat(acquired1).isNotNull();
assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);

connection1 = acquired1.poolable();
ChannelId id1 = connection1.channel().id();

acquired1.invalidate().block();

Thread.sleep(15);

http2Pool.evictInBackground();

assertThat(http2Pool.activeStreams()).isEqualTo(0);
assertThat(http2Pool.connections.size()).isEqualTo(0);

PooledRef<Connection> acquired2 = http2Pool.acquire().block();

assertThat(acquired2).isNotNull();
assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);

connection2 = acquired2.poolable();
ChannelId id2 = connection2.channel().id();

assertThat(id1).isNotEqualTo(id2);

acquired2.invalidate().block();

Thread.sleep(15);

http2Pool.evictInBackground();

assertThat(http2Pool.activeStreams()).isEqualTo(0);
assertThat(http2Pool.connections.size()).isEqualTo(0);
}
finally {
if (connection1 != null) {
((EmbeddedChannel) connection1.channel()).finishAndReleaseAll();
connection1.dispose();
}
if (connection2 != null) {
((EmbeddedChannel) connection2.channel()).finishAndReleaseAll();
connection2.dispose();
}
}
}

@Test
void evictInBackgroundMaxLifeTime() throws Exception {
PoolBuilder<Connection, PoolConfig<Connection>> poolBuilder =
PoolBuilder.from(Mono.fromSupplier(() -> {
Channel channel = new EmbeddedChannel(
new TestChannelId(),
Http2FrameCodecBuilder.forClient().build());
return Connection.from(channel);
}))
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1)
.evictInBackground(Duration.ofSeconds(5));
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10));

Connection connection1 = null;
Connection connection2 = null;
try {
PooledRef<Connection> acquired1 = http2Pool.acquire().block();

assertThat(acquired1).isNotNull();
assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);

connection1 = acquired1.poolable();
ChannelId id1 = connection1.channel().id();

Thread.sleep(10);

assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);

acquired1.invalidate().block();

http2Pool.evictInBackground();

assertThat(http2Pool.activeStreams()).isEqualTo(0);
assertThat(http2Pool.connections.size()).isEqualTo(0);

PooledRef<Connection> acquired2 = http2Pool.acquire().block();

assertThat(acquired2).isNotNull();
assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);

connection2 = acquired2.poolable();
ChannelId id2 = connection2.channel().id();

assertThat(id1).isNotEqualTo(id2);

acquired2.invalidate().block();

Thread.sleep(10);

http2Pool.evictInBackground();

assertThat(http2Pool.activeStreams()).isEqualTo(0);
assertThat(http2Pool.connections.size()).isEqualTo(0);
}
finally {
if (connection1 != null) {
((EmbeddedChannel) connection1.channel()).finishAndReleaseAll();
connection1.dispose();
}
if (connection2 != null) {
((EmbeddedChannel) connection2.channel()).finishAndReleaseAll();
connection2.dispose();
}
}
}

@Test
void maxIdleTime() throws Exception {
PoolBuilder<Connection, PoolConfig<Connection>> poolBuilder =
Expand Down

0 comments on commit dae5de8

Please sign in to comment.