Skip to content

Commit

Permalink
fix #86 Add background eviction
Browse files Browse the repository at this point in the history
This commit adds an option for background eviction of resources in case
no activity is registered on the pool.

Note that the background eviction is best effort, and will back off
if there are concurrent release / acquire activity. The later actually
checks the eviction predicate too, which allows for some amount of
eviction to occur even if the background task has backed off.
  • Loading branch information
simonbasle committed Oct 5, 2020
1 parent 2fcc5d8 commit 69b9c81
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 11 deletions.
93 changes: 93 additions & 0 deletions src/jcstress/java/reactor/pool/SimpleDequePoolStressTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (c) 2018-Present Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.pool;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.LLLL_Result;

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import static org.openjdk.jcstress.annotations.Expect.*;

public class SimpleDequePoolStressTest {

@JCStressTest
@Outcome(id = {"EVICTED, ACQUIRED-10, 0, 0"}, expect = ACCEPTABLE, desc = "evicted, acquired second resource")
@Outcome(id = {"EVICTED, NOT ACQUIRED, 0, 1"}, expect = ACCEPTABLE_INTERESTING, desc = "evicted, acquired failed fast during/before destroy")
@Outcome(id = {"EVICTED, ACQUIRED1, 0, 0"}, expect = FORBIDDEN, desc = "evicted resource acquired, before destroy")
@Outcome(id = {"EVICTED, ACQUIRED1001, 0, 0"}, expect = FORBIDDEN, desc = "evicted resource acquired, after destroy")
@State
public static class BackgroundEvictionVsAcquire {

final AtomicInteger resource = new AtomicInteger();
final AtomicBoolean firstResourceCreated = new AtomicBoolean();

final SimpleDequePool<AtomicInteger> pool = PoolBuilder
.from(Mono.defer(() -> {
if (firstResourceCreated.getAndSet(true)) {
return Mono.just(new AtomicInteger(-10));
}
resource.compareAndSet(0, 1);
return Mono.just(resource);
}))
.evictInBackground(Duration.ZERO, Schedulers.immediate()) //we'll call directly
.sizeBetween(1, 1) //we'll warmup the first resource
.evictionPredicate((res, meta) -> res.get() > 0)
.destroyHandler(ai -> Mono.fromRunnable(() -> ai.addAndGet(1000)))
.build(conf -> new SimpleDequePool<>(conf, true));


{
int warmedUp = pool.warmup().block(Duration.ofSeconds(1));
if (warmedUp != 1) throw new IllegalStateException("should have warmed up one");
if (resource.get() != 1) throw new IllegalStateException("should have initiated");
}

@Actor
public void backgroundEviction() {
pool.evictInBackground();
}

@Actor
public void acquisition(LLLL_Result r) {
try {
AtomicInteger ai = pool.acquire().block().poolable();
r.r2 = "ACQUIRED" + ai.get();
}
catch (PoolAcquirePendingLimitException error) {
r.r2 = "NOT ACQUIRED";
}
}

@Arbiter
public void arbiter(LLLL_Result r) {
r.r1 = resource.get() > 1 ? "EVICTED" : "NOT EVICTED";
r.r3 = pool.idleResources.size();
r.r4 = pool.poolConfig.allocationStrategy().estimatePermitCount();
}
}

}
51 changes: 48 additions & 3 deletions src/main/java/reactor/pool/DefaultPoolConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
package reactor.pool;

import java.time.Clock;
import java.time.Duration;
import java.util.function.BiPredicate;
import java.util.function.Function;

import org.reactivestreams.Publisher;

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/**
* A default {@link PoolConfig} that can be extended to bear more configuration options
Expand All @@ -39,6 +41,8 @@ public class DefaultPoolConfig<POOLABLE> implements PoolConfig<POOLABLE> {
protected final Function<POOLABLE, ? extends Publisher<Void>> releaseHandler;
protected final Function<POOLABLE, ? extends Publisher<Void>> destroyHandler;
protected final BiPredicate<POOLABLE, PooledRefMetadata> evictionPredicate;
protected final Duration evictInBackgroundInterval;
protected final Scheduler evictInBackgroundScheduler;
protected final Scheduler acquisitionScheduler;
protected final PoolMetricsRecorder metricsRecorder;
protected final Clock clock;
Expand All @@ -50,6 +54,8 @@ public DefaultPoolConfig(Mono<POOLABLE> allocator,
Function<POOLABLE, ? extends Publisher<Void>> releaseHandler,
Function<POOLABLE, ? extends Publisher<Void>> destroyHandler,
BiPredicate<POOLABLE, PooledRefMetadata> evictionPredicate,
Duration evictInBackgroundInterval,
Scheduler evictInBackgroundScheduler,
Scheduler acquisitionScheduler,
PoolMetricsRecorder metricsRecorder,
Clock clock,
Expand All @@ -60,15 +66,37 @@ public DefaultPoolConfig(Mono<POOLABLE> allocator,
this.releaseHandler = releaseHandler;
this.destroyHandler = destroyHandler;
this.evictionPredicate = evictionPredicate;
this.evictInBackgroundInterval = evictInBackgroundInterval;
this.evictInBackgroundScheduler = evictInBackgroundScheduler;
this.acquisitionScheduler = acquisitionScheduler;
this.metricsRecorder = metricsRecorder;
this.clock = clock;
this.isIdleLRU = isIdleLRU;
}

/**
* @deprecated use the {@link #DefaultPoolConfig(Mono, AllocationStrategy, int, Function, Function, BiPredicate, Scheduler, PoolMetricsRecorder, Clock, boolean) other constructor}
* with explicit setting of {@code isIdleLru}, to be removed in 0.3.x
* @deprecated use the {@link #DefaultPoolConfig(Mono, AllocationStrategy, int, Function, Function, BiPredicate, Duration, Scheduler, Scheduler, PoolMetricsRecorder, Clock, boolean) other constructor}
* with explicit setting of background eviction, to be removed in 0.3.x
*/
@Deprecated
public DefaultPoolConfig(Mono<POOLABLE> allocator,
AllocationStrategy allocationStrategy,
int maxPending,
Function<POOLABLE, ? extends Publisher<Void>> releaseHandler,
Function<POOLABLE, ? extends Publisher<Void>> destroyHandler,
BiPredicate<POOLABLE, PooledRefMetadata> evictionPredicate,
Scheduler acquisitionScheduler,
PoolMetricsRecorder metricsRecorder,
Clock clock,
boolean isIdleLRU) {
this(allocator, allocationStrategy, maxPending, releaseHandler, destroyHandler, evictionPredicate,
Duration.ZERO, Schedulers.immediate(),
acquisitionScheduler, metricsRecorder, clock, isIdleLRU);
}

/**
* @deprecated use the {@link #DefaultPoolConfig(Mono, AllocationStrategy, int, Function, Function, BiPredicate, Duration, Scheduler, Scheduler, PoolMetricsRecorder, Clock, boolean) other constructor}
* with explicit setting of {@code isIdleLru} and background eviction, to be removed in 0.3.x
*/
@Deprecated
public DefaultPoolConfig(Mono<POOLABLE> allocator,
Expand All @@ -80,7 +108,10 @@ public DefaultPoolConfig(Mono<POOLABLE> allocator,
Scheduler acquisitionScheduler,
PoolMetricsRecorder metricsRecorder,
Clock clock) {
this(allocator, allocationStrategy, maxPending, releaseHandler, destroyHandler, evictionPredicate, acquisitionScheduler, metricsRecorder, clock,
this(allocator, allocationStrategy, maxPending, releaseHandler, destroyHandler, evictionPredicate,
Duration.ZERO,
Schedulers.immediate(),
acquisitionScheduler, metricsRecorder, clock,
true);
}

Expand All @@ -99,6 +130,8 @@ protected DefaultPoolConfig(PoolConfig<POOLABLE> toCopy) {
this.releaseHandler = toCopyDpc.releaseHandler;
this.destroyHandler = toCopyDpc.destroyHandler;
this.evictionPredicate = toCopyDpc.evictionPredicate;
this.evictInBackgroundInterval = toCopyDpc.evictInBackgroundInterval;
this.evictInBackgroundScheduler = toCopyDpc.evictInBackgroundScheduler;
this.acquisitionScheduler = toCopyDpc.acquisitionScheduler;
this.metricsRecorder = toCopyDpc.metricsRecorder;
this.clock = toCopyDpc.clock;
Expand All @@ -111,6 +144,8 @@ protected DefaultPoolConfig(PoolConfig<POOLABLE> toCopy) {
this.releaseHandler = toCopy.releaseHandler();
this.destroyHandler = toCopy.destroyHandler();
this.evictionPredicate = toCopy.evictionPredicate();
this.evictInBackgroundInterval = toCopy.evictInBackgroundInterval();
this.evictInBackgroundScheduler = toCopy.evictInBackgroundScheduler();
this.acquisitionScheduler = toCopy.acquisitionScheduler();
this.metricsRecorder = toCopy.metricsRecorder();
this.clock = toCopy.clock();
Expand Down Expand Up @@ -148,6 +183,16 @@ public BiPredicate<POOLABLE, PooledRefMetadata> evictionPredicate() {
return this.evictionPredicate;
}

@Override
public Duration evictInBackgroundInterval() {
return this.evictInBackgroundInterval;
}

@Override
public Scheduler evictInBackgroundScheduler() {
return this.evictInBackgroundScheduler;
}

@Override
public Scheduler acquisitionScheduler() {
return this.acquisitionScheduler;
Expand Down
38 changes: 30 additions & 8 deletions src/main/java/reactor/pool/PoolBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@ public static <T> PoolBuilder<T, PoolConfig<T>> from(Publisher<? extends T> allo

final Mono<T> allocator;
final Function<PoolConfig<T>, CONF> configModifier;
int maxPending = -1;
AllocationStrategy allocationStrategy = null;
Function<T, ? extends Publisher<Void>> releaseHandler = noopHandler();
Function<T, ? extends Publisher<Void>> destroyHandler = noopHandler();
BiPredicate<T, PooledRefMetadata> evictionPredicate = neverPredicate();
Scheduler acquisitionScheduler = Schedulers.immediate();
Clock clock = Clock.systemUTC();
PoolMetricsRecorder metricsRecorder = NoOpPoolMetricsRecorder.INSTANCE;
int maxPending = -1;
AllocationStrategy allocationStrategy = null;
Function<T, ? extends Publisher<Void>> releaseHandler = noopHandler();
Function<T, ? extends Publisher<Void>> destroyHandler = noopHandler();
BiPredicate<T, PooledRefMetadata> evictionPredicate = neverPredicate();
Duration evictionBackgroundInterval = Duration.ZERO;
Scheduler evictionBackgroundScheduler = Schedulers.immediate();
Scheduler acquisitionScheduler = Schedulers.immediate();
Clock clock = Clock.systemUTC();
PoolMetricsRecorder metricsRecorder = NoOpPoolMetricsRecorder.INSTANCE;
boolean idleLruOrder = true;

PoolBuilder(Mono<T> allocator, Function<PoolConfig<T>, CONF> configModifier) {
Expand All @@ -86,6 +88,8 @@ public static <T> PoolBuilder<T, PoolConfig<T>> from(Publisher<? extends T> allo
this.releaseHandler = source.releaseHandler;
this.destroyHandler = source.destroyHandler;
this.evictionPredicate = source.evictionPredicate;
this.evictionBackgroundInterval = source.evictionBackgroundInterval;
this.evictionBackgroundScheduler = source.evictionBackgroundScheduler;
this.acquisitionScheduler = source.acquisitionScheduler;
this.metricsRecorder = source.metricsRecorder;
this.clock = source.clock;
Expand Down Expand Up @@ -180,6 +184,22 @@ public PoolBuilder<T, CONF> evictionPredicate(BiPredicate<T, PooledRefMetadata>
return this;
}

public PoolBuilder<T, CONF> evictOnlyWhenUsed() {
this.evictionBackgroundInterval = Duration.ZERO;
this.evictionBackgroundScheduler = Schedulers.immediate();
return this;
}

public PoolBuilder<T, CONF> evictInBackground(Duration evictionInterval) {
return this.evictInBackground(evictionInterval, Schedulers.parallel());
}

public PoolBuilder<T, CONF> evictInBackground(Duration evictionInterval, Scheduler reaperTaskScheduler) {
this.evictionBackgroundInterval = evictionInterval;
this.evictionBackgroundScheduler = reaperTaskScheduler;
return this;
}

/**
* Set the maximum number of <i>subscribed</i> {@link Pool#acquire()} Monos that can
* be in a pending state (ie they wait for a resource to be released, as no idle
Expand Down Expand Up @@ -404,6 +424,8 @@ CONF buildConfig() {
releaseHandler,
destroyHandler,
evictionPredicate,
evictionBackgroundInterval,
evictionBackgroundScheduler,
acquisitionScheduler,
metricsRecorder,
clock,
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/reactor/pool/PoolConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package reactor.pool;

import java.time.Clock;
import java.time.Duration;
import java.util.function.BiPredicate;
import java.util.function.Function;

Expand Down Expand Up @@ -74,6 +75,22 @@ public interface PoolConfig<POOLABLE> {
*/
BiPredicate<POOLABLE, PooledRefMetadata> evictionPredicate();

/**
* If the pool is configured to perform regular eviction checks on the background, returns the {@link Duration} representing
* the interval at which such checks are made. Otherwise returns {@link Duration#ZERO} (the default).
*/
default Duration evictInBackgroundInterval() {
return Duration.ZERO; //TODO remove the default implementation in 0.2.0
}

/**
* If the pool is configured to perform regular eviction checks on the background, returns the {@link Scheduler} on
* which these checks are made. Otherwise returns {@link Schedulers#immediate()} (the default).
*/
default Scheduler evictInBackgroundScheduler() {
return Schedulers.immediate(); //TODO remove the default implementation in 0.2.0
}

/**
* When set, {@link Pool} implementation MAY decide to use the {@link Scheduler}
* to publish resources in a more deterministic way: the publishing thread would then
Expand Down

0 comments on commit 69b9c81

Please sign in to comment.