Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review ben stephane #4

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ configure(rootProject) { project ->
compile "io.projectreactor:reactor-core"

compile "org.jctools:jctools-core:$jcToolsVersion"
compile "org.hdrhistogram:HdrHistogram:$hdrHistogramVersion"

// JSR-305 annotations
compileOnly "com.google.code.findbugs:jsr305:3.0.2"
Expand All @@ -189,6 +188,7 @@ configure(rootProject) { project ->
optional "org.slf4j:slf4j-api:$slf4jVersion"

// Testing
testCompile "org.hdrhistogram:HdrHistogram:$hdrHistogramVersion"
testCompile "org.assertj:assertj-core:$assertJVersion"
testCompile "org.awaitility:awaitility:$awaitilityVersion"
testCompile "io.projectreactor:reactor-test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.pool.impl;
package reactor.pool;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Function;
import java.util.function.Predicate;

import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
Expand All @@ -22,30 +30,16 @@
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.pool.AllocationStrategy;
import reactor.pool.Pool;
import reactor.pool.PooledRef;
import reactor.pool.metrics.NoOpPoolMetricsRecorder;
import reactor.pool.metrics.PoolMetricsRecorder;
import reactor.pool.util.AllocationStrategies;
import reactor.util.Logger;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Function;
import java.util.function.Predicate;

/**
* An abstract base version of a {@link Pool}, mutualizing small amounts of code and allowing to build common
* related classes like {@link AbstractPooledRef} or {@link Borrower}.
*
* @author Simon Baslé
* @author Stephane Maldini
*/
abstract class AbstractPool<POOLABLE> implements Pool<POOLABLE> {

Expand All @@ -61,12 +55,12 @@ abstract class AbstractPool<POOLABLE> implements Pool<POOLABLE> {
AbstractPool(DefaultPoolConfig<POOLABLE> poolConfig, Logger logger) {
this.poolConfig = poolConfig;
this.logger = logger;
this.metricsRecorder = poolConfig.metricsRecorder();
this.metricsRecorder = poolConfig.metricsRecorder;
}

abstract void doAcquire(Borrower<POOLABLE> borrower);

private void defaultDestroy(@Nullable POOLABLE poolable) {
void defaultDestroy(@Nullable POOLABLE poolable) {
if (poolable instanceof Disposable) {
((Disposable) poolable).dispose();
}
Expand All @@ -81,19 +75,21 @@ else if (poolable instanceof Closeable) {
}

/**
* Apply the configured destroyFactory to get the destroy {@link Mono} AND return a permit to the {@link AllocationStrategy},
* Apply the configured destroyHandler to get the destroy {@link Mono} AND return a permit to the {@link AllocationStrategy},
* which assumes that the {@link Mono} will always be subscribed immediately.
*
* @param ref the {@link PooledRef} that is not part of the live set
* @return the destroy {@link Mono}, which MUST be subscribed immediately
*/
Mono<Void> destroyPoolable(PooledRef<POOLABLE> ref) {
POOLABLE poolable = ref.poolable();
poolConfig.allocationStrategy().returnPermits(1);
poolConfig.allocationStrategy.returnPermits(1);
long start = metricsRecorder.now();
metricsRecorder.recordLifetimeDuration(ref.timeSinceAllocation());
Function<POOLABLE, Mono<Void>> factory = poolConfig.destroyResource();
if (factory == DefaultPoolConfig.NO_OP_FACTORY) {
if (ref instanceof PooledRefMetrics) {
metricsRecorder.recordLifetimeDuration(((PooledRefMetrics)ref).timeSinceAllocation());
}
Function<POOLABLE, Mono<Void>> factory = poolConfig.destroyHandler;
if (factory == PoolBuilder.noopHandler()) {
return Mono.fromRunnable(() -> {
defaultDestroy(poolable);
metricsRecorder.recordDestroyLatency(metricsRecorder.measureTime(start));
Expand All @@ -110,7 +106,7 @@ Mono<Void> destroyPoolable(PooledRef<POOLABLE> ref) {
*
* @author Simon Baslé
*/
abstract static class AbstractPooledRef<T> implements PooledRef<T> {
abstract static class AbstractPooledRef<T> implements PooledRef<T>, PooledRefMetrics {

final long creationTimestamp;
final PoolMetricsRecorder metricsRecorder;
Expand Down Expand Up @@ -177,22 +173,6 @@ public long timeSinceRelease() {
return metricsRecorder.measureTime(tsr);
}

/**
* Implementors MUST have the Mono call {@link #markReleased()} upon subscription.
* <p>
* {@inheritDoc}
*/
@Override
public abstract Mono<Void> release();

/**
* Implementors MUST have the Mono call {@link #markReleased()} upon subscription.
* <p>
* {@inheritDoc}
*/
@Override
public abstract Mono<Void> invalidate();

@Override
public String toString() {
return "PooledRef{" +
Expand Down Expand Up @@ -268,129 +248,34 @@ public String toString() {

/**
* A inner representation of a {@link AbstractPool} configuration.
*
* @author Simon Baslé
*/
static class DefaultPoolConfig<POOLABLE> {

static final Function<?, Mono<Void>> NO_OP_FACTORY = it -> Mono.empty();

private final Mono<POOLABLE> allocator;
private final int initialSize;
private final AllocationStrategy allocationStrategy;
private final Function<POOLABLE, Mono<Void>> resetFactory;
private final Function<POOLABLE, Mono<Void>> destroyFactory;
private final Predicate<PooledRef<POOLABLE>> evictionPredicate;
private final Scheduler deliveryScheduler;
private final PoolMetricsRecorder metricsRecorder;
final Mono<POOLABLE> allocator;
final int initialSize;
final AllocationStrategy allocationStrategy;
final Function<POOLABLE, Mono<Void>> releaseHandler;
final Function<POOLABLE, Mono<Void>> destroyHandler;
final Predicate<PooledRef<POOLABLE>> evictionPredicate;
final Scheduler acquisitionScheduler;
final PoolMetricsRecorder metricsRecorder;

DefaultPoolConfig(Mono<POOLABLE> allocator,
int initialSize,
@Nullable AllocationStrategy allocationStrategy,
@Nullable Function<POOLABLE, Mono<Void>> resetFactory,
@Nullable Function<POOLABLE, Mono<Void>> destroyFactory,
@Nullable Predicate<PooledRef<POOLABLE>> evictionPredicate,
@Nullable Scheduler deliveryScheduler,
@Nullable PoolMetricsRecorder metricsRecorder) {
AllocationStrategy allocationStrategy,
Function<POOLABLE, Mono<Void>> releaseHandler,
Function<POOLABLE, Mono<Void>> destroyHandler,
Predicate<PooledRef<POOLABLE>> evictionPredicate,
Scheduler acquisitionScheduler,
PoolMetricsRecorder metricsRecorder) {
this.allocator = allocator;
this.initialSize = initialSize < 0 ? 0 : initialSize;
this.allocationStrategy = allocationStrategy == null ? AllocationStrategies.unbounded() : allocationStrategy;

@SuppressWarnings("unchecked")
Function<POOLABLE, Mono<Void>> noOp = (Function<POOLABLE, Mono<Void>>) NO_OP_FACTORY;

this.resetFactory = resetFactory == null ? noOp : resetFactory;
this.destroyFactory = destroyFactory == null ? noOp : destroyFactory;

this.evictionPredicate = evictionPredicate == null ? slot -> false : evictionPredicate;
this.deliveryScheduler = deliveryScheduler == null ? Schedulers.immediate() : deliveryScheduler;
this.metricsRecorder = metricsRecorder == null ? NoOpPoolMetricsRecorder.INSTANCE : metricsRecorder;
}

/**
nebhale marked this conversation as resolved.
Show resolved Hide resolved
* The asynchronous factory that produces new resources.
*
* @return a {@link Mono} representing the creation of a resource
*/
Mono<POOLABLE> allocator() {
return this.allocator;
}

/**
* Defines a strategy / limit for the number of pooled object to allocate.
*
* @return the {@link AllocationStrategy} for the pool
*/
AllocationStrategy allocationStrategy() {
return this.allocationStrategy;
}

/**
* @return the minimum number of objects a {@link Pool} should create at initialization.
*/
int initialSize() {
return this.initialSize;
}

/**
* When a resource is {@link PooledRef#release() released}, defines a mechanism of resetting any lingering state of
* the resource in order for it to become usable again. The {@link #evictionPredicate()} is applied AFTER this reset.
* <p>
* For example, a buffer could have a readerIndex and writerIndex that need to be flipped back to zero.
*
* @return a {@link Function} representing the asynchronous reset mechanism for a given resource
*/
Function<POOLABLE, Mono<Void>> resetResource() {
return this.resetFactory;
}

/**
* Defines a mechanism of resource destruction, cleaning up state and OS resources it could maintain (eg. off-heap
* objects, file handles, socket connections, etc...).
* <p>
* For example, a database connection could need to cleanly sever the connection link by sending a message to the database.
*
* @return a {@link Function} representing the asynchronous destroy mechanism for a given resource
*/
Function<POOLABLE, Mono<Void>> destroyResource() {
return this.destroyFactory;
}

/**
* A {@link Predicate} that checks if a resource should be disposed ({@code true}) or is still in a valid state
* for recycling. This is primarily applied when a resource is released, to check whether or not it can immediately
* be recycled, but could also be applied during an acquire attempt (detecting eg. idle resources) or by a background
* reaping process.
*
* @return A {@link Predicate} that returns true if the {@link PooledRef} should be destroyed instead of used
*/
Predicate<PooledRef<POOLABLE>> evictionPredicate() {
return this.evictionPredicate;
}

/**
* The {@link Scheduler} on which the {@link Pool} should publish resources, independently of which thread called
* {@link Pool#acquire()} or {@link PooledRef#release()} or on which thread the {@link #allocator()} produced new
* resources.
* <p>
* Use {@link Schedulers#immediate()} if determinism is less important than staying on the same threads.
*
* @return a {@link Scheduler} on which to publish resources
*/
Scheduler deliveryScheduler() {
return this.deliveryScheduler;
}

/**
* The {@link PoolMetricsRecorder} to use to collect instrumentation data of the {@link Pool}
* implementations.
* <p>
* Defaults to {@link NoOpPoolMetricsRecorder}
*
* @return the {@link PoolMetricsRecorder} to use
*/
PoolMetricsRecorder metricsRecorder() {
return this.metricsRecorder;
this.initialSize = initialSize;
this.allocationStrategy = allocationStrategy;
this.releaseHandler = releaseHandler;
this.destroyHandler = destroyHandler;
this.evictionPredicate = evictionPredicate;
this.acquisitionScheduler = acquisitionScheduler;
this.metricsRecorder = metricsRecorder;
}
}
}
Loading