diff --git a/config/findbugs-exclude.xml b/config/findbugs-exclude.xml
index bbc6f41e2ee..ce5368d4bdd 100644
--- a/config/findbugs-exclude.xml
+++ b/config/findbugs-exclude.xml
@@ -190,10 +190,10 @@
-
+
-
+
diff --git a/driver-core/src/main/com/mongodb/internal/connection/ConcurrentPool.java b/driver-core/src/main/com/mongodb/internal/connection/ConcurrentPool.java
index f66e702ad20..d675cdfaf90 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/ConcurrentPool.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/ConcurrentPool.java
@@ -150,7 +150,7 @@ public T get(final long timeout, final TimeUnit timeUnit) {
T t = available.pollLast();
if (t == null) {
- t = createNewAndReleasePermitIfFailure(noInit -> {});
+ t = createNewAndReleasePermitIfFailure();
}
return t;
@@ -197,28 +197,27 @@ public void prune() {
* The {@code postCreate} action throwing a exception causes this method to stop and re-throw that exception.
*
* @param initialize An action applied to non-{@code null} new items.
- * If this action throws an {@link Exception}, it must release resources associated with the item.
+ * If an exception is thrown by the action, the action must treat the provided item as if obtained via
+ * a {@link #get(long, TimeUnit) get…} method, {@linkplain #release(Object, boolean) releasing} it
+ * if an exception is thrown; otherwise the action must not release the item.
*/
public void ensureMinSize(final int minSize, final Consumer initialize) {
while (getCount() < minSize) {
if (!acquirePermit(0, TimeUnit.MILLISECONDS)) {
break;
}
- T newItem = createNewAndReleasePermitIfFailure(initialize);
+ T newItem = createNewAndReleasePermitIfFailure();
+ initialize.accept(newItem);
release(newItem);
}
}
- /**
- * @param initialize See {@link #ensureMinSize(int, Consumer)}.
- */
- private T createNewAndReleasePermitIfFailure(final Consumer initialize) {
+ private T createNewAndReleasePermitIfFailure() {
try {
T newMember = itemFactory.create();
if (newMember == null) {
throw new MongoInternalException("The factory for the pool created a null item");
}
- initialize.accept(newMember);
return newMember;
} catch (RuntimeException e) {
permits.release();
diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java
index 7687af2cc50..9e078af7fe1 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java
@@ -62,6 +62,7 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.StampedLock;
import static com.mongodb.assertions.Assertions.assertFalse;
import static com.mongodb.assertions.Assertions.assertNotNull;
@@ -77,7 +78,6 @@
@SuppressWarnings("deprecation")
class DefaultConnectionPool implements ConnectionPool {
private static final Logger LOGGER = Loggers.getLogger("connection");
- private static final Executor SAME_THREAD_EXECUTOR = Runnable::run;
/**
* Is package-access for the purpose of testing and must not be used for any other purpose outside of this class.
*/
@@ -88,7 +88,7 @@ class DefaultConnectionPool implements ConnectionPool {
private final AtomicInteger generation = new AtomicInteger(0);
private final AtomicInteger lastPrunedGeneration = new AtomicInteger(0);
private final ScheduledExecutorService sizeMaintenanceTimer;
- private ExecutorService asyncGetter;
+ private final Workers workers;
private final Runnable maintenanceTask;
private final ConnectionPoolListener connectionPoolListener;
private final ServerId serverId;
@@ -107,6 +107,7 @@ class DefaultConnectionPool implements ConnectionPool {
sizeMaintenanceTimer = createMaintenanceTimer();
connectionPoolCreated(connectionPoolListener, serverId, settings);
openConcurrencyLimiter = new OpenConcurrencyLimiter(MAX_CONNECTING);
+ workers = new Workers();
}
@Override
@@ -166,9 +167,9 @@ public void getAsync(final SingleResultCallback callback) {
}
if (immediateConnection != null) {
- openAsync(immediateConnection, timeout, getAsyncGetter(), eventSendingCallback);
+ openAsync(immediateConnection, timeout, eventSendingCallback);
} else {
- getAsyncGetter().execute(() -> {
+ workers.getter().execute(() -> {
if (timeout.expired()) {
eventSendingCallback.onResult(null, createTimeoutException(timeout));
return;
@@ -180,7 +181,7 @@ public void getAsync(final SingleResultCallback callback) {
eventSendingCallback.onResult(null, e);
return;
}
- openAsync(connection, timeout, SAME_THREAD_EXECUTOR, eventSendingCallback);
+ openAsync(connection, timeout, eventSendingCallback);
});
}
}
@@ -205,7 +206,7 @@ private Throwable checkOutFailed(final Throwable t) {
return result;
}
- private void openAsync(final PooledConnection pooledConnection, final Timeout timeout, final Executor executor,
+ private void openAsync(final PooledConnection pooledConnection, final Timeout timeout,
final SingleResultCallback callback) {
if (pooledConnection.opened()) {
if (LOGGER.isTraceEnabled()) {
@@ -218,20 +219,7 @@ private void openAsync(final PooledConnection pooledConnection, final Timeout ti
LOGGER.trace(format("Pooled connection %s to server %s is not yet open",
getId(pooledConnection), serverId));
}
- executor.execute(() -> openConcurrencyLimiter.openAsyncOrGetAvailable(pooledConnection, timeout, callback));
- }
- }
-
- private synchronized ExecutorService getAsyncGetter() {
- if (asyncGetter == null) {
- asyncGetter = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncGetter"));
- }
- return asyncGetter;
- }
-
- private synchronized void shutdownAsyncGetter() {
- if (asyncGetter != null) {
- asyncGetter.shutdownNow();
+ workers.opener().execute(() -> openConcurrencyLimiter.openAsyncOrGetAvailable(pooledConnection, timeout, callback));
}
}
@@ -249,7 +237,7 @@ public void close() {
if (sizeMaintenanceTimer != null) {
sizeMaintenanceTimer.shutdownNow();
}
- shutdownAsyncGetter();
+ workers.close();
closed = true;
connectionPoolListener.connectionPoolClosed(new ConnectionPoolClosedEvent(serverId));
}
@@ -710,7 +698,7 @@ public Prune shouldPrune(final UsageTrackingInternalConnection usageTrackingConn
}
/**
- * Package-private methods are thread-safe,
+ * Package-access methods are thread-safe,
* and only they should be called outside of the {@link OpenConcurrencyLimiter}'s code.
*/
@ThreadSafe
@@ -745,10 +733,10 @@ void openImmediately(final PooledConnection connection) throws MongoTimeoutExcep
* one becomes available while waiting for a permit.
* The first phase has one of the following outcomes:
*
- * - A {@link MongoTimeoutException} or a different {@link Exception} is thrown.
+ * - A {@link MongoTimeoutException} or a different {@link Exception} is thrown,
+ * and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}.
* - An opened connection different from the specified one is returned,
- * and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}.
- *
+ * and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}.
* - A permit is acquired, {@link #connectionCreated(ConnectionPoolListener, ConnectionId)} is reported
* and an attempt to open the specified {@code connection} is made. This is the second phase in which
* the {@code connection} is {@linkplain PooledConnection#open() opened synchronously}.
@@ -838,6 +826,7 @@ void openAsyncOrGetAvailable(
private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boolean tryGetAvailable, final Timeout timeout)
throws MongoTimeoutException {
PooledConnection availableConnection = null;
+ boolean expressedDesireToGetAvailableConnection = false;
tryLock(timeout);
try {
if (tryGetAvailable) {
@@ -855,6 +844,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
return availableConnection;
}
expressDesireToGetAvailableConnection();
+ expressedDesireToGetAvailableConnection = true;
}
long remainingNanos = timeout.remainingOrInfinite(NANOSECONDS);
while (permits == 0) {
@@ -865,7 +855,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
if (Timeout.expired(remainingNanos)) {
throw createTimeoutException(timeout);
}
- remainingNanos = awaitNanos(remainingNanos);
+ remainingNanos = awaitNanos(condition, remainingNanos);
}
if (availableConnection == null) {
assertTrue(permits > 0);
@@ -874,7 +864,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
return availableConnection;
} finally {
try {
- if (tryGetAvailable && availableConnection == null) {//the desired connection slot has not yet been removed
+ if (expressedDesireToGetAvailableConnection && availableConnection == null) {
giveUpOnTryingToGetAvailableConnection();
}
} finally {
@@ -968,17 +958,15 @@ private void tryLock(final Timeout timeout) throws MongoTimeoutException {
}
/**
- * Returns {@code timeoutNanos} if {@code timeoutNanos} is negative, otherwise returns 0 or a positive value.
- *
- * @param timeoutNanos Use a negative value for an infinite timeout,
- * in which case {@link Condition#awaitNanos(long)} is called with {@link Long#MAX_VALUE}.
+ * @param timeoutNanos See {@link Timeout#startNow(long)}.
+ * @return The remaining duration as per {@link Timeout#remainingOrInfinite(TimeUnit)} if waiting ended early either
+ * spuriously or because of receiving a signal.
*/
- private long awaitNanos(final long timeoutNanos) {
+ private long awaitNanos(final Condition condition, final long timeoutNanos) throws MongoInterruptedException {
try {
- if (timeoutNanos < 0) {
- //noinspection ResultOfMethodCallIgnored
- condition.awaitNanos(Long.MAX_VALUE);
- return timeoutNanos;
+ if (timeoutNanos < 0 || timeoutNanos == Long.MAX_VALUE) {
+ condition.await();
+ return -1;
} else {
return Math.max(0, condition.awaitNanos(timeoutNanos));
}
@@ -997,4 +985,56 @@ private static final class MutableReference {
private MutableReference() {
}
}
+
+ @ThreadSafe
+ private static class Workers implements AutoCloseable {
+ private volatile ExecutorService getter;
+ private volatile ExecutorService opener;
+ private final Lock lock;
+
+ Workers() {
+ lock = new StampedLock().asWriteLock();
+ }
+
+ Executor getter() {
+ if (getter == null) {
+ lock.lock();
+ try {
+ if (getter == null) {
+ getter = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncGetter"));
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ return getter;
+ }
+
+ Executor opener() {
+ if (opener == null) {
+ lock.lock();
+ try {
+ if (opener == null) {
+ opener = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncOpener"));
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ return opener;
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (getter != null) {
+ getter.shutdownNow();
+ }
+ } finally {
+ if (opener != null) {
+ opener.shutdownNow();
+ }
+ }
+ }
+ }
}
diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java b/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java
index 766d5447a78..b4e8a88c115 100644
--- a/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java
+++ b/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java
@@ -25,9 +25,12 @@
import com.mongodb.event.ConnectionCreatedEvent;
import com.mongodb.internal.Timeout;
import com.mongodb.internal.async.SingleResultCallback;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.Collection;
@@ -43,6 +46,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
import static com.mongodb.internal.connection.DefaultConnectionPool.MAX_CONNECTING;
import static java.lang.Long.MAX_VALUE;
@@ -50,10 +54,10 @@
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static junit.framework.TestCase.assertNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -72,15 +76,16 @@ public class DefaultConnectionPoolTest {
private DefaultConnectionPool provider;
private ExecutorService cachedExecutor;
- @Before
+ @BeforeEach
public void setUp() {
connectionFactory = new TestInternalConnectionFactory();
cachedExecutor = Executors.newCachedThreadPool();
}
- @After
+ @AfterEach
@SuppressWarnings("try")
public void cleanup() throws InterruptedException {
+ //noinspection unused
try (DefaultConnectionPool closed = provider) {
cachedExecutor.shutdownNow();
//noinspection ResultOfMethodCallIgnored
@@ -236,18 +241,30 @@ public void shouldPruneAfterMaintenanceTaskRuns() throws InterruptedException {
assertTrue(connectionFactory.getCreatedConnections().get(0).isClosed());
}
- @Test
- public void concurrentUsage() throws InterruptedException {
- int maxAvailableConnections = 7;
+ @ParameterizedTest
+ @MethodSource("concurrentUsageArguments")
+ public void concurrentUsage(final int minSize, final int maxSize, final int concurrentUsersCount,
+ final boolean checkoutSync, final boolean checkoutAsync, final float invalidateProb)
+ throws InterruptedException {
ControllableConnectionFactory controllableConnFactory = newControllableConnectionFactory(cachedExecutor);
provider = new DefaultConnectionPool(SERVER_ID, controllableConnFactory.factory, ConnectionPoolSettings.builder()
- .maxSize(MAX_CONNECTING + maxAvailableConnections)
- .minSize(2)
+ .minSize(minSize)
+ .maxSize(maxSize)
.maxWaitTime(TEST_WAIT_TIMEOUT_MILLIS, MILLISECONDS)
.maintenanceInitialDelay(0, NANOSECONDS)
.maintenanceFrequency(100, MILLISECONDS)
.build());
- assertUseConcurrently(provider, 2 * maxAvailableConnections, cachedExecutor, SECONDS.toNanos(15));
+ assertUseConcurrently(provider, concurrentUsersCount, checkoutSync, checkoutAsync, invalidateProb,
+ cachedExecutor, SECONDS.toNanos(10));
+ }
+
+ private static Stream concurrentUsageArguments() {
+ return Stream.of(
+ Arguments.of(0, 1, 8, true, false, 0.02f),
+ Arguments.of(0, 1, 8, false, true, 0.02f),
+ Arguments.of(Math.min(3, MAX_CONNECTING), MAX_CONNECTING, 8, true, true, 0f),
+ Arguments.of(MAX_CONNECTING + 5, MAX_CONNECTING + 5, 2 * (MAX_CONNECTING + 5), true, true, 0.02f),
+ Arguments.of(Math.min(3, MAX_CONNECTING), MAX_CONNECTING + 5, 2 * (MAX_CONNECTING + 5), true, true, 0.9f));
}
@Test
@@ -262,7 +279,8 @@ public void callbackShouldNotBlockCheckoutIfOpenAsyncWorksNotInCurrentThread() t
.maintenanceInitialDelay(MAX_VALUE, NANOSECONDS)
.build());
acquireOpenPermits(provider, MAX_CONNECTING, InfiniteCheckoutEmulation.INFINITE_CALLBACK, controllableConnFactory, listener);
- assertUseConcurrently(provider, 2 * maxAvailableConnections, cachedExecutor, SECONDS.toNanos(10));
+ assertUseConcurrently(provider, 2 * maxAvailableConnections, true, true, 0.02f,
+ cachedExecutor, SECONDS.toNanos(10));
}
/**
@@ -339,33 +357,36 @@ public void checkoutHandOverMechanism() throws InterruptedException, TimeoutExce
}
private static void assertUseConcurrently(final DefaultConnectionPool pool, final int concurrentUsersCount,
+ final boolean sync, final boolean async, final float invalidateProb,
final ExecutorService executor, final long durationNanos) throws InterruptedException {
try {
- useConcurrently(pool, concurrentUsersCount, executor, durationNanos);
+ useConcurrently(pool, concurrentUsersCount, sync, async, invalidateProb, executor, durationNanos);
} catch (TimeoutException | ExecutionException e) {
throw new AssertionError(e);
}
}
private static void useConcurrently(final DefaultConnectionPool pool, final int concurrentUsersCount,
+ final boolean checkoutSync, final boolean checkoutAsync, final float invalidateProb,
final ExecutorService executor, final long durationNanos)
throws ExecutionException, InterruptedException, TimeoutException {
+ assertTrue(invalidateProb >= 0 && invalidateProb <= 1);
Runnable spontaneouslyInvalidate = () -> {
- if (ThreadLocalRandom.current().nextFloat() < 0.02) {
+ if (ThreadLocalRandom.current().nextFloat() < invalidateProb) {
pool.invalidate();
}
};
Collection> tasks = new ArrayList<>();
Timeout duration = Timeout.startNow(durationNanos);
for (int i = 0; i < concurrentUsersCount; i++) {
- if (i % 2 == 0) {//check out synchronously
+ if ((checkoutSync && checkoutAsync) ? i % 2 == 0 : checkoutSync) {//check out synchronously
tasks.add(executor.submit(() -> {
while (!(duration.expired() || Thread.currentThread().isInterrupted())) {
spontaneouslyInvalidate.run();
pool.get(TEST_WAIT_TIMEOUT_MILLIS, MILLISECONDS).close();
}
}));
- } else {//check out asynchronously
+ } else if (checkoutAsync) {//check out asynchronously
tasks.add(executor.submit(() -> {
while (!(duration.expired() || Thread.currentThread().isInterrupted())) {
spontaneouslyInvalidate.run();