diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java
index bd3999ee3a..6a34b8b6d9 100644
--- a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java
+++ b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java
@@ -253,6 +253,61 @@ default String name() {
return null;
}
+ interface AllocationStrategy {
+
+ /**
+ * Best-effort peek at the state of the strategy which indicates roughly how many more connections can currently be
+ * allocated. Should be paired with {@link #getPermits(int)} for an atomic permission.
+ *
+ * @return an ESTIMATED count of how many more connections can currently be allocated
+ */
+ int estimatePermitCount();
+
+ /**
+ * Try to get the permission to allocate a {@code desired} positive number of new connections. Returns the permissible
+ * number of connections which MUST be created (otherwise the internal live counter of the strategy might be off).
+ * This permissible number might be zero, and it can also be a greater number than {@code desired}.
+ * Once a connection is discarded from the pool, it must update the strategy using {@link #returnPermits(int)}
+ * (which can happen in batches or with value {@literal 1}).
+ *
+ * @param desired the desired number of new connections
+ * @return the actual number of new connections that MUST be created, can be 0 and can be more than {@code desired}
+ */
+ int getPermits(int desired);
+
+ /**
+ * Returns the best estimate of the number of permits currently granted, between 0 and {@link Integer#MAX_VALUE}
+ *
+ * @return the best estimate of the number of permits currently granted, between 0 and {@link Integer#MAX_VALUE}
+ */
+ int permitGranted();
+
+ /**
+ * Return the minimum number of permits this strategy tries to maintain granted
+ * (reflecting a minimal size for the pool), or {@code 0} for scale-to-zero.
+ *
+ * @return the minimum number of permits this strategy tries to maintain, or {@code 0}
+ */
+ int permitMinimum();
+
+ /**
+ * Returns the maximum number of permits this strategy can grant in total, or {@link Integer#MAX_VALUE} for unbounded
+ *
+ * @return the maximum number of permits this strategy can grant in total, or {@link Integer#MAX_VALUE} for unbounded
+ */
+ int permitMaximum();
+
+ /**
+ * Update the strategy to indicate that N connections were discarded, potentially leaving space
+ * for N new ones to be allocated. Users MUST ensure that this method isn't called with a value greater than the
+ * number of held permits it has.
+ *
+ * Some strategy MIGHT throw an {@link IllegalArgumentException} if it can be determined the number of returned permits
+ * is not consistent with the strategy's limits and delivered permits.
+ */
+ void returnPermits(int returned);
+ }
+
/**
* Build a {@link ConnectionProvider} to cache and reuse a fixed maximum number of
* {@link Connection}. Further connections will be pending acquisition depending on
@@ -387,6 +442,7 @@ class ConnectionPoolSpec> implements Suppl
boolean metricsEnabled;
String leasingStrategy = DEFAULT_POOL_LEASING_STRATEGY;
Supplier extends ConnectionProvider.MeterRegistrar> registrar;
+ AllocationStrategy allocationStrategy;
/**
* Returns {@link ConnectionPoolSpec} new instance with default properties.
@@ -410,6 +466,7 @@ private ConnectionPoolSpec() {
this.metricsEnabled = copy.metricsEnabled;
this.leasingStrategy = copy.leasingStrategy;
this.registrar = copy.registrar;
+ this.allocationStrategy = copy.allocationStrategy;
}
/**
@@ -428,6 +485,8 @@ public final SPEC pendingAcquireTimeout(Duration pendingAcquireTimeout) {
/**
* Set the options to use for configuring {@link ConnectionProvider} maximum connections per connection pool.
+ * This is a pre-made allocation strategy where only max connections is specified.
+ * Custom allocation strategies can be provided via {@link #allocationStrategy(AllocationStrategy)}.
* Default to {@link #DEFAULT_POOL_MAX_CONNECTIONS}.
*
* @param maxConnections the maximum number of connections (per connection pool) before start pending
@@ -439,6 +498,7 @@ public final SPEC maxConnections(int maxConnections) {
throw new IllegalArgumentException("Max Connections value must be strictly positive");
}
this.maxConnections = maxConnections;
+ this.allocationStrategy = null;
return get();
}
@@ -580,6 +640,22 @@ public final SPEC evictInBackground(Duration evictionInterval) {
return get();
}
+ /**
+ * Limits in how many connections can be allocated and managed by the pool are driven by the
+ * provided {@link AllocationStrategy}. This is a customization escape hatch that replaces the last
+ * configured strategy, but most cases should be covered by the {@link #maxConnections()}
+ * pre-made allocation strategy.
+ *
+ * @param allocationStrategy the {@link AllocationStrategy} to use
+ * @return {@literal this}
+ * @see #maxConnections()
+ * @since 1.0.20
+ */
+ public final SPEC allocationStrategy(AllocationStrategy allocationStrategy) {
+ this.allocationStrategy = Objects.requireNonNull(allocationStrategy, "allocationStrategy");
+ return get();
+ }
+
@Override
@SuppressWarnings("unchecked")
public SPEC get() {
diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java
index 0823c7562a..daf18c33aa 100644
--- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java
+++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java
@@ -30,7 +30,6 @@
import reactor.netty.ReactorNetty;
import reactor.netty.transport.TransportConfig;
import reactor.netty.internal.util.MapUtils;
-import reactor.pool.AllocationStrategy;
import reactor.pool.InstrumentedPool;
import reactor.pool.Pool;
import reactor.pool.PoolBuilder;
@@ -370,6 +369,7 @@ protected static final class PoolFactory {
final Supplier extends MeterRegistrar> registrar;
final Clock clock;
final Duration disposeTimeout;
+ final AllocationStrategy allocationStrategy;
PoolFactory(ConnectionPoolSpec> conf, Duration disposeTimeout) {
this(conf, disposeTimeout, null);
@@ -389,11 +389,12 @@ protected static final class PoolFactory {
this.registrar = conf.registrar;
this.clock = clock;
this.disposeTimeout = disposeTimeout;
+ this.allocationStrategy = conf.allocationStrategy;
}
public InstrumentedPool newPool(
Publisher allocator,
- @Nullable AllocationStrategy allocationStrategy,
+ @Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility
Function> destroyHandler,
BiPredicate evictionPredicate) {
if (disposeTimeout != null) {
@@ -405,7 +406,7 @@ public InstrumentedPool newPool(
public InstrumentedPool newPool(
Publisher allocator,
- @Nullable AllocationStrategy allocationStrategy,
+ @Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility
Function> destroyHandler,
BiPredicate evictionPredicate,
Function, InstrumentedPool> poolFactory) {
@@ -438,7 +439,12 @@ PoolBuilder> newPoolInternal(
DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE));
}
else {
- poolBuilder = poolBuilder.sizeBetween(0, maxConnections);
+ if (allocationStrategy == null) {
+ poolBuilder = poolBuilder.sizeBetween(0, maxConnections);
+ }
+ else {
+ poolBuilder = poolBuilder.allocationStrategy(new DelegatingAllocationStrategy(allocationStrategy));
+ }
}
if (clock != null) {
@@ -455,6 +461,11 @@ PoolBuilder> newPoolInternal(
return poolBuilder;
}
+ @Nullable
+ public AllocationStrategy allocationStrategy() {
+ return allocationStrategy;
+ }
+
public long maxIdleTime() {
return this.maxIdleTime;
}
@@ -476,6 +487,45 @@ public String toString() {
", pendingAcquireTimeout=" + pendingAcquireTimeout +
'}';
}
+
+ static final class DelegatingAllocationStrategy implements reactor.pool.AllocationStrategy {
+
+ final AllocationStrategy delegate;
+
+ DelegatingAllocationStrategy(AllocationStrategy delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public int estimatePermitCount() {
+ return delegate.estimatePermitCount();
+ }
+
+ @Override
+ public int getPermits(int desired) {
+ return delegate.getPermits(desired);
+ }
+
+ @Override
+ public int permitGranted() {
+ return delegate.permitGranted();
+ }
+
+ @Override
+ public int permitMinimum() {
+ return delegate.permitMinimum();
+ }
+
+ @Override
+ public int permitMaximum() {
+ return delegate.permitMaximum();
+ }
+
+ @Override
+ public void returnPermits(int returned) {
+ delegate.returnPermits(returned);
+ }
+ }
}
static final class PoolKey {
diff --git a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java
index 6a37523989..4866c16957 100644
--- a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java
+++ b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 VMware, Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
class ConnectionProviderTest {
+ static final ConnectionProvider.AllocationStrategy TEST_ALLOCATION_STRATEGY = new TestAllocationStrategy();
static final String TEST_STRING = "";
static final Supplier TEST_SUPPLIER = () -> (a, b, c, d) -> {};
@@ -64,6 +65,9 @@ else if (Map.class == clazz) {
else if (Supplier.class == clazz) {
field.set(builder, TEST_SUPPLIER);
}
+ else if (ConnectionProvider.AllocationStrategy.class == clazz) {
+ field.set(builder, TEST_ALLOCATION_STRATEGY);
+ }
else if (boolean.class == clazz) {
field.setBoolean(builder, true);
}
@@ -74,4 +78,36 @@ else if (int.class == clazz) {
throw new IllegalArgumentException("Unknown field type " + clazz);
}
}
+
+ static final class TestAllocationStrategy implements ConnectionProvider.AllocationStrategy {
+
+ @Override
+ public int estimatePermitCount() {
+ return 0;
+ }
+
+ @Override
+ public int getPermits(int desired) {
+ return 0;
+ }
+
+ @Override
+ public int permitGranted() {
+ return 0;
+ }
+
+ @Override
+ public int permitMinimum() {
+ return 0;
+ }
+
+ @Override
+ public int permitMaximum() {
+ return 0;
+ }
+
+ @Override
+ public void returnPermits(int returned) {
+ }
+ }
}
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java
new file mode 100644
index 0000000000..2d2ebdecb2
--- /dev/null
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java
@@ -0,0 +1,197 @@
+/*
+ * Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package reactor.netty.http.client;
+
+import reactor.netty.resources.ConnectionProvider;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+/**
+ * HTTP/2 {@link ConnectionProvider.AllocationStrategy}.
+ *
+ * This class is based on
+ * https://github.com/reactor/reactor-pool/blob/d5cb5b72cdbcbbee8d781e06972c4da21766107f/src/main/java/reactor/pool/AllocationStrategies.java#L73
+ *
+ * @author Violeta Georgieva
+ * @since 1.0.20
+ */
+public final class Http2AllocationStrategy implements ConnectionProvider.AllocationStrategy {
+
+ public interface Builder {
+
+ /**
+ * Build a new {@link Http2AllocationStrategy}
+ *
+ * @return a new {@link Http2AllocationStrategy}
+ */
+ Http2AllocationStrategy build();
+
+ /**
+ * Configures the maximum number of the concurrent streams that can be opened to the remote peer.
+ * When evaluating how many streams can be opened to the remote peer,
+ * the minimum of this configuration and the remote peer configuration is taken.
+ * Default to {@code -1} - use always the remote peer configuration.
+ *
+ * @param maxConcurrentStreams the maximum number of the concurrent streams that can be opened to the remote peer
+ * @return {@code this}
+ */
+ Builder maxConcurrentStreams(long maxConcurrentStreams);
+
+ /**
+ * Configures the maximum number of live connections to keep in the pool.
+ * Default to {@link Integer#MAX_VALUE} - no upper limit.
+ *
+ * @param maxConnections the maximum number of live connections to keep in the pool
+ * @return {@code this}
+ */
+ Builder maxConnections(int maxConnections);
+
+ /**
+ * Configures the minimum number of live connections to keep in the pool (can be the best effort).
+ * Default to {@code 0}.
+ *
+ * @return {@code this}
+ */
+ Builder minConnections(int minConnections);
+ }
+
+ /**
+ * Creates a builder for {@link Http2AllocationStrategy}.
+ *
+ * @return a new {@link Http2AllocationStrategy.Builder}
+ */
+ public static Http2AllocationStrategy.Builder builder() {
+ return new Http2AllocationStrategy.Build();
+ }
+
+ @Override
+ public int estimatePermitCount() {
+ return PERMITS.get(this);
+ }
+
+ @Override
+ public int getPermits(int desired) {
+ if (desired < 0) {
+ return 0;
+ }
+
+ for (;;) {
+ int p = permits;
+ int target = Math.min(desired, p);
+
+ if (PERMITS.compareAndSet(this, p, p - target)) {
+ return target;
+ }
+ }
+ }
+
+ /**
+ * Returns the configured maximum number of the concurrent streams that can be opened to the remote peer.
+ *
+ * @return the configured maximum number of the concurrent streams that can be opened to the remote peer
+ */
+ public long maxConcurrentStreams() {
+ return maxConcurrentStreams;
+ }
+
+ @Override
+ public int permitGranted() {
+ return maxConnections - PERMITS.get(this);
+ }
+
+ @Override
+ public int permitMinimum() {
+ return minConnections;
+ }
+
+ @Override
+ public int permitMaximum() {
+ return maxConnections;
+ }
+
+ @Override
+ public void returnPermits(int returned) {
+ for (;;) {
+ int p = PERMITS.get(this);
+ if (p + returned > maxConnections) {
+ throw new IllegalArgumentException("Too many permits returned: returned=" + returned +
+ ", would bring to " + (p + returned) + "/" + maxConnections);
+ }
+ if (PERMITS.compareAndSet(this, p, p + returned)) {
+ return;
+ }
+ }
+ }
+
+ final long maxConcurrentStreams;
+ final int maxConnections;
+ final int minConnections;
+
+ volatile int permits;
+ static final AtomicIntegerFieldUpdater PERMITS = AtomicIntegerFieldUpdater.newUpdater(Http2AllocationStrategy.class, "permits");
+
+ Http2AllocationStrategy(Build build) {
+ this.maxConcurrentStreams = build.maxConcurrentStreams;
+ this.maxConnections = build.maxConnections;
+ this.minConnections = build.minConnections;
+ PERMITS.lazySet(this, this.maxConnections);
+ }
+
+ static final class Build implements Builder {
+ static final long DEFAULT_MAX_CONCURRENT_STREAMS = -1;
+ static final int DEFAULT_MAX_CONNECTIONS = Integer.MAX_VALUE;
+ static final int DEFAULT_MIN_CONNECTIONS = 0;
+
+ long maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
+ int maxConnections = DEFAULT_MAX_CONNECTIONS;
+ int minConnections = DEFAULT_MIN_CONNECTIONS;
+
+ @Override
+ public Http2AllocationStrategy build() {
+ if (minConnections > maxConnections) {
+ throw new IllegalArgumentException("minConnections must be less than or equal to maxConnections");
+ }
+ return new Http2AllocationStrategy(this);
+ }
+
+ @Override
+ public Builder maxConcurrentStreams(long maxConcurrentStreams) {
+ if (maxConcurrentStreams < -1) {
+ throw new IllegalArgumentException("maxConcurrentStreams must be grater than or equal to -1");
+ }
+ this.maxConcurrentStreams = maxConcurrentStreams;
+ return this;
+ }
+
+ @Override
+ public Builder maxConnections(int maxConnections) {
+ if (maxConnections < 1) {
+ throw new IllegalArgumentException("maxConnections must be strictly positive");
+ }
+ this.maxConnections = maxConnections;
+ return this;
+ }
+
+ @Override
+ public Builder minConnections(int minConnections) {
+ if (minConnections < 0) {
+ throw new IllegalArgumentException("minConnections must be positive or zero");
+ }
+ this.minConnections = minConnections;
+ return this;
+ }
+ }
+}
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java
index f0103645bb..06ff01db89 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java
@@ -496,7 +496,7 @@ static final class PooledConnectionAllocator {
this.remoteAddress = remoteAddress;
this.resolver = resolver;
this.pool = poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
- poolConFig -> new Http2Pool(poolConFig, poolFactory.maxIdleTime(), poolFactory.maxLifeTime()));
+ poolConFig -> new Http2Pool(poolConFig, poolFactory.allocationStrategy(), poolFactory.maxIdleTime(), poolFactory.maxLifeTime()));
}
Publisher connectChannel() {
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java
index 31b2ff3308..989de99c41 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java
@@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import io.netty.channel.ChannelHandlerContext;
@@ -48,6 +49,7 @@
import reactor.netty.internal.shaded.reactor.pool.PoolShutdownException;
import reactor.netty.internal.shaded.reactor.pool.PooledRef;
import reactor.netty.internal.shaded.reactor.pool.PooledRefMetadata;
+import reactor.netty.resources.ConnectionProvider;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
@@ -83,6 +85,10 @@
* {@link Http2Pool#activeStreams()} always return the active streams from all connections currently in the pool.
*
*
+ * If minimum connections is specified, the cached connections with active streams will be kept at that minimum
+ * (can be the best effort). However, if the cached connections have reached max concurrent streams,
+ * then new connections will be allocated up to the maximum connections limit.
+ *
* Configurations that are not applicable
*
* - {@link PoolConfig#destroyHandler()} - the destroy handler cannot be used as the destruction is more complex.
@@ -94,7 +100,6 @@
* - {@link PoolConfig#reuseIdleResourcesInLruOrder()} - FIFO is used when checking the connections.
* - FIFO is used when obtaining the pending borrowers
* - Warm up functionality is not supported
- * - Setting minimum connections configuration is not supported
*
* This class is based on
* https://github.com/reactor/reactor-pool/blob/v0.2.7/src/main/java/reactor/pool/SimpleDequePool.java
@@ -144,23 +149,27 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool.
AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "wip");
final Clock clock;
+ final Long maxConcurrentStreams;
final long maxIdleTime;
final long maxLifeTime;
+ final int minConnections;
final PoolConfig poolConfig;
+ final LongAdder totalMaxConcurrentStreams = new LongAdder();
long lastInteractionTimestamp;
Disposable evictionTask;
- Http2Pool(PoolConfig poolConfig, long maxIdleTime, long maxLifeTime) {
- if (poolConfig.allocationStrategy().getPermits(0) != 0) {
- throw new IllegalArgumentException("No support for configuring minimum number of connections");
- }
+ Http2Pool(PoolConfig poolConfig, @Nullable ConnectionProvider.AllocationStrategy allocationStrategy,
+ long maxIdleTime, long maxLifeTime) {
this.clock = poolConfig.clock();
this.connections = new ConcurrentLinkedQueue<>();
this.lastInteractionTimestamp = clock.millis();
+ this.maxConcurrentStreams = allocationStrategy instanceof Http2AllocationStrategy ?
+ ((Http2AllocationStrategy) allocationStrategy).maxConcurrentStreams() : -1;
this.maxIdleTime = maxIdleTime;
this.maxLifeTime = maxLifeTime;
+ this.minConnections = allocationStrategy == null ? 0 : allocationStrategy.permitMinimum();
this.pending = new ConcurrentLinkedDeque<>();
this.poolConfig = poolConfig;
@@ -353,7 +362,10 @@ void drainLoop() {
if (borrowersCount != 0) {
// find a connection that can be used for opening a new stream
- Slot slot = findConnection(resources);
+ // when cached connections are below minimum connections, then allocate a new connection
+ boolean belowMinConnections = minConnections > 0 &&
+ poolConfig.allocationStrategy().permitGranted() < minConnections;
+ Slot slot = belowMinConnections ? null : findConnection(resources);
if (slot != null) {
Borrower borrower = pollPending(borrowers, true);
if (borrower == null) {
@@ -375,52 +387,64 @@ void drainLoop() {
});
}
else {
- int permits = poolConfig.allocationStrategy().getPermits(1);
- if (permits <= 0) {
- if (maxPending >= 0) {
- borrowersCount = pendingSize;
- int toCull = borrowersCount - maxPending;
- for (int i = 0; i < toCull; i++) {
- Borrower extraneous = pollPending(borrowers, true);
- if (extraneous != null) {
- pendingAcquireLimitReached(extraneous, maxPending);
- }
- }
- }
+ int resourcesCount = idleSize;
+ if (minConnections > 0 &&
+ poolConfig.allocationStrategy().permitGranted() >= minConnections &&
+ resourcesCount == 0) {
+ // connections allocations were triggered
}
else {
- Borrower borrower = pollPending(borrowers, true);
- if (borrower == null) {
- continue;
+ int permits = poolConfig.allocationStrategy().getPermits(1);
+ if (permits <= 0) {
+ if (maxPending >= 0) {
+ borrowersCount = pendingSize;
+ int toCull = borrowersCount - maxPending;
+ for (int i = 0; i < toCull; i++) {
+ Borrower extraneous = pollPending(borrowers, true);
+ if (extraneous != null) {
+ pendingAcquireLimitReached(extraneous, maxPending);
+ }
+ }
+ }
}
- if (isDisposed()) {
- borrower.fail(new PoolShutdownException());
- return;
+ else {
+ if (permits > 1) {
+ // warmup is not supported
+ poolConfig.allocationStrategy().returnPermits(permits - 1);
+ }
+ Borrower borrower = pollPending(borrowers, true);
+ if (borrower == null) {
+ continue;
+ }
+ if (isDisposed()) {
+ borrower.fail(new PoolShutdownException());
+ return;
+ }
+ borrower.stopPendingCountdown();
+ Mono allocator = poolConfig.allocator();
+ Mono primary =
+ allocator.doOnEach(sig -> {
+ if (sig.isOnNext()) {
+ Connection newInstance = sig.get();
+ assert newInstance != null;
+ Slot newSlot = new Slot(this, newInstance);
+ if (log.isDebugEnabled()) {
+ log.debug(format(newInstance.channel(), "Channel activated"));
+ }
+ ACQUIRED.incrementAndGet(this);
+ borrower.deliver(new Http2PooledRef(newSlot));
+ }
+ else if (sig.isOnError()) {
+ Throwable error = sig.getThrowable();
+ assert error != null;
+ poolConfig.allocationStrategy().returnPermits(1);
+ borrower.fail(error);
+ }
+ })
+ .contextWrite(borrower.currentContext());
+
+ primary.subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> drain(), this::drain);
}
- borrower.stopPendingCountdown();
- Mono allocator = poolConfig.allocator();
- Mono primary =
- allocator.doOnEach(sig -> {
- if (sig.isOnNext()) {
- Connection newInstance = sig.get();
- assert newInstance != null;
- Slot newSlot = new Slot(this, newInstance);
- if (log.isDebugEnabled()) {
- log.debug(format(newInstance.channel(), "Channel activated"));
- }
- ACQUIRED.incrementAndGet(this);
- borrower.deliver(new Http2PooledRef(newSlot));
- }
- else if (sig.isOnError()) {
- Throwable error = sig.getThrowable();
- assert error != null;
- poolConfig.allocationStrategy().returnPermits(1);
- borrower.fail(error);
- }
- })
- .contextWrite(borrower.currentContext());
-
- primary.subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> drain(), this::drain);
}
}
}
@@ -725,10 +749,10 @@ Context currentContext() {
@Override
public void request(long n) {
if (Operators.validate(n)) {
- // Cannot rely on idleSize because there might be idle connections but not suitable for use
+ long estimateStreamsCount = pool.totalMaxConcurrentStreams.longValue() - pool.acquired;
int permits = pool.poolConfig.allocationStrategy().estimatePermitCount();
int pending = pool.pendingSize;
- if (!acquireTimeout.isZero() && permits <= pending) {
+ if (!acquireTimeout.isZero() && permits + estimateStreamsCount <= pending) {
timeoutTask = Schedulers.parallel().schedule(this, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS);
}
pool.doAcquire(this);
@@ -889,6 +913,7 @@ static final class Slot extends AtomicBoolean {
final Http2Pool pool;
long idleTimestamp;
+ long maxConcurrentStreams;
volatile ChannelHandlerContext http2FrameCodecCtx;
volatile ChannelHandlerContext http2MultiplexHandlerCtx;
@@ -897,12 +922,26 @@ static final class Slot extends AtomicBoolean {
this.connection = connection;
this.creationTimestamp = pool.clock.millis();
this.pool = pool;
+ ChannelHandlerContext frameCodec = http2FrameCodecCtx();
+ if (frameCodec != null && http2MultiplexHandlerCtx() != null) {
+ this.maxConcurrentStreams = ((Http2FrameCodec) frameCodec.handler()).connection().local().maxActiveStreams();
+ this.maxConcurrentStreams = pool.maxConcurrentStreams == -1 ? maxConcurrentStreams :
+ Math.min(pool.maxConcurrentStreams, maxConcurrentStreams);
+ }
+ this.pool.totalMaxConcurrentStreams.add(this.maxConcurrentStreams);
}
boolean canOpenStream() {
ChannelHandlerContext frameCodec = http2FrameCodecCtx();
if (frameCodec != null && http2MultiplexHandlerCtx() != null) {
- int maxActiveStreams = ((Http2FrameCodec) frameCodec.handler()).connection().local().maxActiveStreams();
+ long maxActiveStreams = ((Http2FrameCodec) frameCodec.handler()).connection().local().maxActiveStreams();
+ maxActiveStreams = pool.maxConcurrentStreams == -1 ? maxActiveStreams :
+ Math.min(pool.maxConcurrentStreams, maxActiveStreams);
+ long diff = maxActiveStreams - maxConcurrentStreams;
+ if (diff != 0) {
+ maxConcurrentStreams = maxActiveStreams;
+ pool.totalMaxConcurrentStreams.add(diff);
+ }
int concurrency = this.concurrency;
return concurrency < maxActiveStreams;
}
diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java
new file mode 100644
index 0000000000..a96baf00e4
--- /dev/null
+++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package reactor.netty.http.client;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static reactor.netty.http.client.Http2AllocationStrategy.Build.DEFAULT_MAX_CONCURRENT_STREAMS;
+import static reactor.netty.http.client.Http2AllocationStrategy.Build.DEFAULT_MAX_CONNECTIONS;
+import static reactor.netty.http.client.Http2AllocationStrategy.Build.DEFAULT_MIN_CONNECTIONS;
+
+class Http2AllocationStrategyTest {
+ private Http2AllocationStrategy.Builder builder;
+
+ @BeforeEach
+ void setUp() {
+ builder = Http2AllocationStrategy.builder();
+ }
+
+ @Test
+ void build() {
+ builder.maxConcurrentStreams(2).maxConnections(2).minConnections(1);
+ Http2AllocationStrategy strategy = builder.build();
+ assertThat(strategy.maxConcurrentStreams()).isEqualTo(2);
+ assertThat(strategy.permitMaximum()).isEqualTo(2);
+ assertThat(strategy.permitMinimum()).isEqualTo(1);
+
+ }
+
+ @Test
+ void buildBadValues() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> builder.maxConnections(1).minConnections(2).build())
+ .withMessage("minConnections must be less than or equal to maxConnections");
+ }
+
+ @Test
+ void maxConcurrentStreams() {
+ builder.maxConcurrentStreams(2);
+ Http2AllocationStrategy strategy = builder.build();
+ assertThat(strategy.maxConcurrentStreams()).isEqualTo(2);
+ assertThat(strategy.permitMaximum()).isEqualTo(DEFAULT_MAX_CONNECTIONS);
+ assertThat(strategy.permitMinimum()).isEqualTo(DEFAULT_MIN_CONNECTIONS);
+ }
+
+ @Test
+ void maxConcurrentStreamsBadValues() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> builder.maxConcurrentStreams(-2))
+ .withMessage("maxConcurrentStreams must be grater than or equal to -1");
+ }
+
+ @Test
+ void permitMaximum() {
+ builder.maxConnections(2);
+ Http2AllocationStrategy strategy = builder.build();
+ assertThat(strategy.maxConcurrentStreams()).isEqualTo(DEFAULT_MAX_CONCURRENT_STREAMS);
+ assertThat(strategy.permitMaximum()).isEqualTo(2);
+ assertThat(strategy.permitMinimum()).isEqualTo(DEFAULT_MIN_CONNECTIONS);
+ }
+
+ @Test
+ void permitMaximumBadValues() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> builder.maxConnections(0))
+ .withMessage("maxConnections must be strictly positive");
+ }
+
+ @Test
+ void permitMinimum() {
+ builder.minConnections(2);
+ Http2AllocationStrategy strategy = builder.build();
+ assertThat(strategy.maxConcurrentStreams()).isEqualTo(DEFAULT_MAX_CONCURRENT_STREAMS);
+ assertThat(strategy.permitMaximum()).isEqualTo(DEFAULT_MAX_CONNECTIONS);
+ assertThat(strategy.permitMinimum()).isEqualTo(2);
+ }
+
+ @Test
+ void permitMinimumBadValues() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> builder.minConnections(-1))
+ .withMessage("minConnections must be positive or zero");
+ }
+}
diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java
index ec317c8828..32606b377e 100644
--- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java
+++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java
@@ -22,6 +22,7 @@
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException;
@@ -40,7 +41,6 @@
import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
class Http2PoolTest {
@@ -53,7 +53,7 @@ void acquireInvalidate() {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
- Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1));
try {
List> acquired = new ArrayList<>();
@@ -94,7 +94,7 @@ void acquireRelease() {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
- Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1));
try {
List> acquired = new ArrayList<>();
@@ -138,7 +138,7 @@ void evictClosedConnection() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
- Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1));
Connection connection = null;
try {
@@ -211,7 +211,7 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond)
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 2);
- Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1));
Connection connection = null;
try {
@@ -293,7 +293,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
- Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1));
Connection connection = null;
try {
@@ -554,7 +554,7 @@ void maxIdleTime() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
- Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10, -1));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, 10, -1));
Connection connection1 = null;
Connection connection2 = null;
@@ -609,7 +609,7 @@ void maxIdleTimeActiveStreams() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
- Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10, -1));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, 10, -1));
Connection connection1 = null;
Connection connection2 = null;
@@ -668,7 +668,7 @@ void maxLifeTime() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
- Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 10));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10));
Connection connection1 = null;
Connection connection2 = null;
@@ -732,7 +732,7 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 2);
- Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 50));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 50));
Connection connection1 = null;
Connection connection2 = null;
@@ -792,7 +792,7 @@ void maxLifeTimeMaxConnectionsReached() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
- Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 10));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10));
Connection connection = null;
try {
@@ -831,11 +831,97 @@ void maxLifeTimeMaxConnectionsReached() throws Exception {
}
@Test
- void minConnectionsConfigNotSupported() {
+ void minConnections() {
+ EmbeddedChannel channel = new EmbeddedChannel(new TestChannelId(),
+ Http2FrameCodecBuilder.forClient().build(), new Http2MultiplexHandler(new ChannelHandlerAdapter() {}));
PoolBuilder> poolBuilder =
- PoolBuilder.from(Mono.empty()).sizeBetween(1, 2);
- assertThatExceptionOfType(IllegalArgumentException.class)
- .isThrownBy(() -> poolBuilder.build(config -> new Http2Pool(config, -1, -1)));
+ PoolBuilder.from(Mono.just(Connection.from(channel)))
+ .idleResourceReuseLruOrder()
+ .maxPendingAcquireUnbounded()
+ .sizeBetween(1, 3);
+ Http2AllocationStrategy strategy = Http2AllocationStrategy.builder()
+ .maxConnections(3)
+ .minConnections(1)
+ .build();
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, strategy, -1, -1));
+
+ List> acquired = new ArrayList<>();
+ try {
+ Flux.range(0, 3)
+ .flatMap(i -> http2Pool.acquire().doOnNext(acquired::add))
+ .subscribe();
+
+ channel.runPendingTasks();
+
+ assertThat(acquired).hasSize(3);
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(3);
+ assertThat(acquired.get(0).poolable()).isSameAs(acquired.get(1).poolable());
+ assertThat(acquired.get(0).poolable()).isSameAs(acquired.get(2).poolable());
+
+ for (PooledRef slot : acquired) {
+ slot.release().block(Duration.ofSeconds(1));
+ }
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ }
+ finally {
+ for (PooledRef slot : acquired) {
+ Connection conn = slot.poolable();
+ ((EmbeddedChannel) conn.channel()).finishAndReleaseAll();
+ conn.dispose();
+ }
+ }
+ }
+
+ @Test
+ void minConnectionsMaxStreamsReached() {
+ PoolBuilder> poolBuilder =
+ PoolBuilder.from(Mono.fromSupplier(() -> {
+ Channel channel = new EmbeddedChannel(
+ new TestChannelId(),
+ Http2FrameCodecBuilder.forClient().build());
+ return Connection.from(channel);
+ }))
+ .idleResourceReuseLruOrder()
+ .maxPendingAcquireUnbounded()
+ .sizeBetween(1, 3);
+ Http2AllocationStrategy strategy = Http2AllocationStrategy.builder()
+ .maxConnections(3)
+ .minConnections(1)
+ .build();
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, strategy, -1, -1));
+
+ List> acquired = new ArrayList<>();
+ try {
+ Flux.range(0, 3)
+ .flatMap(i -> http2Pool.acquire().doOnNext(acquired::add))
+ .blockLast(Duration.ofSeconds(1));
+
+ assertThat(acquired).hasSize(3);
+
+ for (PooledRef pooledRef : acquired) {
+ ((EmbeddedChannel) pooledRef.poolable().channel()).runPendingTasks();
+ }
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(3);
+ assertThat(acquired.get(0).poolable()).isNotSameAs(acquired.get(1).poolable());
+ assertThat(acquired.get(0).poolable()).isNotSameAs(acquired.get(2).poolable());
+ assertThat(acquired.get(1).poolable()).isNotSameAs(acquired.get(2).poolable());
+
+ for (PooledRef slot : acquired) {
+ slot.release().block(Duration.ofSeconds(1));
+ }
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ }
+ finally {
+ for (PooledRef slot : acquired) {
+ Connection conn = slot.poolable();
+ ((EmbeddedChannel) conn.channel()).finishAndReleaseAll();
+ conn.dispose();
+ }
+ }
}
@Test
@@ -846,7 +932,7 @@ void nonHttp2ConnectionEmittedOnce() {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
- Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1));
try {
PooledRef acquired = http2Pool.acquire().block(Duration.ofSeconds(1));
diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java
index 32453a4c24..0cad951be8 100644
--- a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java
+++ b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java
@@ -49,6 +49,7 @@
import reactor.netty.http.Http11SslContextSpec;
import reactor.netty.http.Http2SslContextSpec;
import reactor.netty.http.HttpProtocol;
+import reactor.netty.http.client.Http2AllocationStrategy;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.server.HttpServer;
import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool;
@@ -543,6 +544,86 @@ private void doTestIssue1982(HttpProtocol[] serverProtocols, HttpProtocol[] clie
}
}
+ //https://github.com/reactor/reactor-netty/issues/1808
+ @Test
+ void testMinConnections() throws Exception {
+ Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey());
+ Http2SslContextSpec clientCtx =
+ Http2SslContextSpec.forClient()
+ .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE));
+
+ disposableServer =
+ createServer()
+ .wiretap(false)
+ .protocol(HttpProtocol.H2)
+ .secure(spec -> spec.sslContext(serverCtx))
+ .route(routes -> routes.post("/", (req, res) -> res.send(req.receive().retain())))
+ .bindNow();
+
+ int requestsNum = 100;
+ CountDownLatch latch = new CountDownLatch(1);
+ DefaultPooledConnectionProvider provider =
+ (DefaultPooledConnectionProvider) ConnectionProvider.builder("testMinConnections")
+ .allocationStrategy(Http2AllocationStrategy.builder().maxConnections(20).minConnections(5).build())
+ .build();
+ AtomicInteger counter = new AtomicInteger();
+ AtomicReference serverAddress = new AtomicReference<>();
+ HttpClient client =
+ createClient(provider, disposableServer.port())
+ .wiretap(false)
+ .protocol(HttpProtocol.H2)
+ .secure(spec -> spec.sslContext(clientCtx))
+ .metrics(true, Function.identity())
+ .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress()))
+ .observe((conn, state) -> {
+ if (state == STREAM_CONFIGURED) {
+ counter.incrementAndGet();
+ conn.onTerminate()
+ .subscribe(null,
+ t -> conn.channel().eventLoop().execute(() -> {
+ if (counter.decrementAndGet() == 0) {
+ latch.countDown();
+ }
+ }),
+ () -> conn.channel().eventLoop().execute(() -> {
+ if (counter.decrementAndGet() == 0) {
+ latch.countDown();
+ }
+ }));
+ }
+ });
+
+ try {
+ Flux.range(0, requestsNum)
+ .flatMap(i ->
+ client.post()
+ .uri("/")
+ .send(ByteBufMono.fromString(Mono.just("testMinConnections")))
+ .responseContent()
+ .aggregate()
+ .asString())
+ .blockLast(Duration.ofSeconds(5));
+
+ assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
+
+ InetSocketAddress sa = (InetSocketAddress) serverAddress.get();
+ String address = sa.getHostString() + ":" + sa.getPort();
+
+ assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS,
+ REMOTE_ADDRESS, address, NAME, "http2.testMinConnections")).isEqualTo(0);
+ double idleConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS,
+ REMOTE_ADDRESS, address, NAME, "http2.testMinConnections");
+ double totalConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS,
+ REMOTE_ADDRESS, address, NAME, "testMinConnections");
+ assertThat(totalConn).isEqualTo(idleConn);
+ assertThat(totalConn).isLessThan(10);
+ }
+ finally {
+ provider.disposeLater()
+ .block(Duration.ofSeconds(5));
+ }
+ }
+
static final class TestPromise extends DefaultChannelPromise {
final ChannelPromise parent;