Skip to content

Commit

Permalink
Merge #2257 into main
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Jun 8, 2022
2 parents 258aa65 + 6530f76 commit a516bea
Show file tree
Hide file tree
Showing 19 changed files with 1,828 additions and 344 deletions.
2 changes: 1 addition & 1 deletion codequality/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
<property name="max" value="4"/>
</module>
<module name="NestedIfDepth">
<property name="max" value="4"/>
<property name="max" value="5"/>
</module>
<module name="NestedTryDepth">
<property name="max" value="4"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public interface NettyPipeline {
String CompressionHandler = LEFT + "compressionHandler";
String ConnectMetricsHandler = LEFT + "connectMetricsHandler";
String H2CUpgradeHandler = LEFT + "h2cUpgradeHandler";
String H2Flush = LEFT + "h2Flush";
String H2MultiplexHandler = LEFT + "h2MultiplexHandler";
String H2OrHttp11Codec = LEFT + "h2OrHttp11Codec";
String H2ToHttp11Codec = LEFT + "h2ToHttp11Codec";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,68 @@ default String name() {
return null;
}

interface AllocationStrategy<A extends AllocationStrategy<A>> {

/**
* Returns a deep copy of this instance.
*
* @return a deep copy of this instance
*/
A copy();

/**
* Best-effort peek at the state of the strategy which indicates roughly how many more connections can currently be
* allocated. Should be paired with {@link #getPermits(int)} for an atomic permission.
*
* @return an ESTIMATED count of how many more connections can currently be allocated
*/
int estimatePermitCount();

/**
* Try to get the permission to allocate a {@code desired} positive number of new connections. Returns the permissible
* number of connections which MUST be created (otherwise the internal live counter of the strategy might be off).
* This permissible number might be zero, and it can also be a greater number than {@code desired}.
* Once a connection is discarded from the pool, it must update the strategy using {@link #returnPermits(int)}
* (which can happen in batches or with value {@literal 1}).
*
* @param desired the desired number of new connections
* @return the actual number of new connections that MUST be created, can be 0 and can be more than {@code desired}
*/
int getPermits(int desired);

/**
* Returns the best estimate of the number of permits currently granted, between 0 and {@link Integer#MAX_VALUE}
*
* @return the best estimate of the number of permits currently granted, between 0 and {@link Integer#MAX_VALUE}
*/
int permitGranted();

/**
* Return the minimum number of permits this strategy tries to maintain granted
* (reflecting a minimal size for the pool), or {@code 0} for scale-to-zero.
*
* @return the minimum number of permits this strategy tries to maintain, or {@code 0}
*/
int permitMinimum();

/**
* Returns the maximum number of permits this strategy can grant in total, or {@link Integer#MAX_VALUE} for unbounded
*
* @return the maximum number of permits this strategy can grant in total, or {@link Integer#MAX_VALUE} for unbounded
*/
int permitMaximum();

/**
* Update the strategy to indicate that N connections were discarded, potentially leaving space
* for N new ones to be allocated. Users MUST ensure that this method isn't called with a value greater than the
* number of held permits it has.
* <p>
* Some strategy MIGHT throw an {@link IllegalArgumentException} if it can be determined the number of returned permits
* is not consistent with the strategy's limits and delivered permits.
*/
void returnPermits(int returned);
}

/**
* Build a {@link ConnectionProvider} to cache and reuse a fixed maximum number of
* {@link Connection}. Further connections will be pending acquisition depending on
Expand Down Expand Up @@ -387,6 +449,7 @@ class ConnectionPoolSpec<SPEC extends ConnectionPoolSpec<SPEC>> implements Suppl
boolean metricsEnabled;
String leasingStrategy = DEFAULT_POOL_LEASING_STRATEGY;
Supplier<? extends ConnectionProvider.MeterRegistrar> registrar;
AllocationStrategy<?> allocationStrategy;

/**
* Returns {@link ConnectionPoolSpec} new instance with default properties.
Expand All @@ -410,6 +473,7 @@ private ConnectionPoolSpec() {
this.metricsEnabled = copy.metricsEnabled;
this.leasingStrategy = copy.leasingStrategy;
this.registrar = copy.registrar;
this.allocationStrategy = copy.allocationStrategy;
}

/**
Expand All @@ -428,17 +492,21 @@ public final SPEC pendingAcquireTimeout(Duration pendingAcquireTimeout) {

/**
* Set the options to use for configuring {@link ConnectionProvider} maximum connections per connection pool.
* This is a pre-made allocation strategy where only max connections is specified.
* Custom allocation strategies can be provided via {@link #allocationStrategy(AllocationStrategy)}.
* Default to {@link #DEFAULT_POOL_MAX_CONNECTIONS}.
*
* @param maxConnections the maximum number of connections (per connection pool) before start pending
* @return {@literal this}
* @see #allocationStrategy(AllocationStrategy)
* @throws IllegalArgumentException if maxConnections is negative
*/
public final SPEC maxConnections(int maxConnections) {
if (maxConnections <= 0) {
throw new IllegalArgumentException("Max Connections value must be strictly positive");
}
this.maxConnections = maxConnections;
this.allocationStrategy = null;
return get();
}

Expand Down Expand Up @@ -580,6 +648,22 @@ public final SPEC evictInBackground(Duration evictionInterval) {
return get();
}

/**
* Limits in how many connections can be allocated and managed by the pool are driven by the
* provided {@link AllocationStrategy}. This is a customization escape hatch that replaces the last
* configured strategy, but most cases should be covered by the {@link #maxConnections()}
* pre-made allocation strategy.
*
* @param allocationStrategy the {@link AllocationStrategy} to use
* @return {@literal this}
* @see #maxConnections()
* @since 1.0.20
*/
public final SPEC allocationStrategy(AllocationStrategy<?> allocationStrategy) {
this.allocationStrategy = Objects.requireNonNull(allocationStrategy, "allocationStrategy");
return get();
}

@Override
@SuppressWarnings("unchecked")
public SPEC get() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import reactor.netty.internal.util.Metrics;
import reactor.netty.transport.TransportConfig;
import reactor.netty.internal.util.MapUtils;
import reactor.pool.AllocationStrategy;
import reactor.pool.InstrumentedPool;
import reactor.pool.Pool;
import reactor.pool.PoolBuilder;
Expand Down Expand Up @@ -91,6 +90,7 @@ public abstract class PooledConnectionProvider<T extends Connection> implements
final Duration poolInactivity;
final Duration disposeTimeout;
final Map<SocketAddress, Integer> maxConnections = new HashMap<>();
Mono<Void> onDispose;

protected PooledConnectionProvider(Builder builder) {
this(builder, null);
Expand All @@ -108,6 +108,7 @@ protected PooledConnectionProvider(Builder builder) {
poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout));
maxConnections.put(entry.getKey(), entry.getValue().maxConnections);
}
this.onDispose = Mono.empty();
scheduleInactivePoolsDisposal();
}

Expand Down Expand Up @@ -197,10 +198,10 @@ public final Mono<Void> disposeLater() {
})
.collect(Collectors.toList());
if (pools.isEmpty()) {
return Mono.empty();
return onDispose;
}
channelPools.clear();
return Mono.when(pools);
return onDispose.and(Mono.when(pools));
});
}

Expand Down Expand Up @@ -250,6 +251,10 @@ public String name() {
return name;
}

public void onDispose(Mono<Void> disposeMono) {
onDispose = onDispose.and(disposeMono);
}

protected abstract CoreSubscriber<PooledRef<T>> createDisposableAcquire(
TransportConfig config,
ConnectionObserver connectionObserver,
Expand Down Expand Up @@ -372,6 +377,7 @@ protected static final class PoolFactory<T extends Connection> {
final Supplier<? extends MeterRegistrar> registrar;
final Clock clock;
final Duration disposeTimeout;
final AllocationStrategy<?> allocationStrategy;

PoolFactory(ConnectionPoolSpec<?> conf, Duration disposeTimeout) {
this(conf, disposeTimeout, null);
Expand All @@ -391,11 +397,12 @@ protected static final class PoolFactory<T extends Connection> {
this.registrar = conf.registrar;
this.clock = clock;
this.disposeTimeout = disposeTimeout;
this.allocationStrategy = conf.allocationStrategy;
}

public InstrumentedPool<T> newPool(
Publisher<T> allocator,
@Nullable AllocationStrategy allocationStrategy,
@Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> evictionPredicate) {
if (disposeTimeout != null) {
Expand All @@ -407,7 +414,7 @@ public InstrumentedPool<T> newPool(

public InstrumentedPool<T> newPool(
Publisher<T> allocator,
@Nullable AllocationStrategy allocationStrategy,
@Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> evictionPredicate,
Function<PoolConfig<T>, InstrumentedPool<T>> poolFactory) {
Expand Down Expand Up @@ -440,7 +447,12 @@ PoolBuilder<T, PoolConfig<T>> newPoolInternal(
DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE));
}
else {
poolBuilder = poolBuilder.sizeBetween(0, maxConnections);
if (allocationStrategy == null) {
poolBuilder = poolBuilder.sizeBetween(0, maxConnections);
}
else {
poolBuilder = poolBuilder.allocationStrategy(new DelegatingAllocationStrategy(allocationStrategy.copy()));
}
}

if (clock != null) {
Expand All @@ -457,6 +469,15 @@ PoolBuilder<T, PoolConfig<T>> newPoolInternal(
return poolBuilder;
}

@Nullable
public AllocationStrategy<?> allocationStrategy() {
return allocationStrategy;
}

public long maxIdleTime() {
return this.maxIdleTime;
}

public long maxLifeTime() {
return maxLifeTime;
}
Expand All @@ -474,6 +495,45 @@ public String toString() {
", pendingAcquireTimeout=" + pendingAcquireTimeout +
'}';
}

static final class DelegatingAllocationStrategy implements reactor.pool.AllocationStrategy {

final AllocationStrategy<?> delegate;

DelegatingAllocationStrategy(AllocationStrategy<?> delegate) {
this.delegate = delegate;
}

@Override
public int estimatePermitCount() {
return delegate.estimatePermitCount();
}

@Override
public int getPermits(int desired) {
return delegate.getPermits(desired);
}

@Override
public int permitGranted() {
return delegate.permitGranted();
}

@Override
public int permitMinimum() {
return delegate.permitMinimum();
}

@Override
public int permitMaximum() {
return delegate.permitMaximum();
}

@Override
public void returnPermits(int returned) {
delegate.returnPermits(returned);
}
}
}

static final class PoolKey {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@

class ConnectionProviderTest {

static final TestAllocationStrategy TEST_ALLOCATION_STRATEGY = new TestAllocationStrategy();
static final String TEST_STRING = "";
static final Supplier<ConnectionProvider.MeterRegistrar> TEST_SUPPLIER = () -> (a, b, c, d) -> {};

Expand Down Expand Up @@ -64,6 +65,9 @@ else if (Map.class == clazz) {
else if (Supplier.class == clazz) {
field.set(builder, TEST_SUPPLIER);
}
else if (ConnectionProvider.AllocationStrategy.class == clazz) {
field.set(builder, TEST_ALLOCATION_STRATEGY);
}
else if (boolean.class == clazz) {
field.setBoolean(builder, true);
}
Expand All @@ -74,4 +78,41 @@ else if (int.class == clazz) {
throw new IllegalArgumentException("Unknown field type " + clazz);
}
}

static final class TestAllocationStrategy implements ConnectionProvider.AllocationStrategy<TestAllocationStrategy> {

@Override
public TestAllocationStrategy copy() {
return this;
}

@Override
public int estimatePermitCount() {
return 0;
}

@Override
public int getPermits(int desired) {
return 0;
}

@Override
public int permitGranted() {
return 0;
}

@Override
public int permitMinimum() {
return 0;
}

@Override
public int permitMaximum() {
return 0;
}

@Override
public void returnPermits(int returned) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package reactor.netty.http;

import java.net.SocketAddress;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -147,6 +148,15 @@ public static HttpResources set(LoopResources loops) {
http2ConnectionProvider = new AtomicReference<>();
}

@Override
public void disposeWhen(SocketAddress remoteAddress) {
ConnectionProvider provider = http2ConnectionProvider.get();
if (provider != null) {
provider.disposeWhen(remoteAddress);
}
super.disposeWhen(remoteAddress);
}

@Override
public AddressResolverGroup<?> getOrCreateDefaultResolver() {
return super.getOrCreateDefaultResolver();
Expand Down
Loading

0 comments on commit a516bea

Please sign in to comment.