Skip to content

Commit

Permalink
Fix deadlock and couple more problems in DefaultConnectionPool (#699)
Browse files Browse the repository at this point in the history
This commit fixes three problems in `DefaultConnectionPool`/`ConcurrentPool`:
- deadlock in `DefaultConnectionPool`;
- unnecessary and harmful removal from `OpenConcurrencyLimiter.desiredConnectionSlots`;
- double release in `DefaultConnectionPool`/`ConcurrentPool`.


Deadlock in `DefaultConnectionPool`

a) Before introducing OpenConcurrencyLimiter, "AsyncGetter" thread was used only
to do blocking `ConcurrentPool.get`.
b) After, I started to additionally use "AsyncGetter" to do
blocking `OpenConcurrencyLimiter.waitUntilOpenPermitAvailable`.

As a result, we may have a thread that gets the last connection (`maxSize` is reached)
from `ConcurrentPool` and submits `waitUntilOpenPermitAvailable` to "AsyncGetter".
Concurrently with this happening, the "AsyncGetter" tries to get from `ConcurrentPool`
and is blocked because there are no more connections available. In such an execution,
`waitUntilOpenPermitAvailable` cannot be completed by "AsyncGetter" because
"AsyncGetter" is blocked doing a different task, which itself cannot be completed.

A solution is to do `ConcurrentPool.get` and `waitUntilOpenPermitAvailable`
in different threads. This way these two different kinds of tasks
will not block each other by waiting in the same queue to be done by a single thread.


Potential unnecessary and harmful removal from `OpenConcurrencyLimiter.desiredConnectionSlots`

If `acquirePermitOrGetAvailableOpenedConnection` is called with `true` as `tryGetAvailable`,
and `getPooledConnectionImmediately` throws an exception,
then `expressDesireToGetAvailableConnection` is not called
but `giveUpOnTryingToGetAvailableConnection` is still called, which is incorrect.


Double release in `DefaultConnectionPool`/`ConcurrentPool`

On one hand `ConcurrentPool.ensureMinSize` tries to not require the caller to do
what can be done by the method itself, e.g., it releases the connection to the pool itself.
On the other hand, always releasing in `ensureMinSize` may lead
to releasing a permit for the same connection twice, thus not respecting the `maxSize`.

It is not easy (requires an additional knob and logic)
to prevent the caller (`DefaultConnectionPool`) from releasing a connection
when initialization fails. It is, therefore, seems better to mandate that the caller
releases a connection if initialization fails.

JAVA-3927
  • Loading branch information
stIncMale committed May 1, 2021
1 parent 2e3d833 commit 73e3085
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 66 deletions.
4 changes: 2 additions & 2 deletions config/findbugs-exclude.xml
Expand Up @@ -190,10 +190,10 @@
<Bug pattern="VA_FORMAT_STRING_USES_NEWLINE"/>
</Match>

<!-- The return value of Condition.awaitNanos is ignored on purpose for infinite timeouts. -->
<!-- The method is a wrapper for `Condition.await`, naturally it does not call it in a loop. -->
<Match>
<Class name="com.mongodb.internal.connection.DefaultConnectionPool$OpenConcurrencyLimiter"/>
<Method name="awaitNanos"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
<Bug pattern="WA_AWAIT_NOT_IN_LOOP"/>
</Match>
</FindBugsFilter>
Expand Up @@ -150,7 +150,7 @@ public T get(final long timeout, final TimeUnit timeUnit) {

T t = available.pollLast();
if (t == null) {
t = createNewAndReleasePermitIfFailure(noInit -> {});
t = createNewAndReleasePermitIfFailure();
}

return t;
Expand Down Expand Up @@ -197,28 +197,27 @@ public void prune() {
* The {@code postCreate} action throwing a exception causes this method to stop and re-throw that exception.
*
* @param initialize An action applied to non-{@code null} new items.
* If this action throws an {@link Exception}, it must release resources associated with the item.
* If an exception is thrown by the action, the action must treat the provided item as if obtained via
* a {@link #get(long, TimeUnit) get…} method, {@linkplain #release(Object, boolean) releasing} it
* if an exception is thrown; otherwise the action must not release the item.
*/
public void ensureMinSize(final int minSize, final Consumer<T> initialize) {
while (getCount() < minSize) {
if (!acquirePermit(0, TimeUnit.MILLISECONDS)) {
break;
}
T newItem = createNewAndReleasePermitIfFailure(initialize);
T newItem = createNewAndReleasePermitIfFailure();
initialize.accept(newItem);
release(newItem);
}
}

/**
* @param initialize See {@link #ensureMinSize(int, Consumer)}.
*/
private T createNewAndReleasePermitIfFailure(final Consumer<T> initialize) {
private T createNewAndReleasePermitIfFailure() {
try {
T newMember = itemFactory.create();
if (newMember == null) {
throw new MongoInternalException("The factory for the pool created a null item");
}
initialize.accept(newMember);
return newMember;
} catch (RuntimeException e) {
permits.release();
Expand Down
Expand Up @@ -62,6 +62,7 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.StampedLock;

import static com.mongodb.assertions.Assertions.assertFalse;
import static com.mongodb.assertions.Assertions.assertNotNull;
Expand All @@ -77,7 +78,6 @@
@SuppressWarnings("deprecation")
class DefaultConnectionPool implements ConnectionPool {
private static final Logger LOGGER = Loggers.getLogger("connection");
private static final Executor SAME_THREAD_EXECUTOR = Runnable::run;
/**
* Is package-access for the purpose of testing and must not be used for any other purpose outside of this class.
*/
Expand All @@ -88,7 +88,7 @@ class DefaultConnectionPool implements ConnectionPool {
private final AtomicInteger generation = new AtomicInteger(0);
private final AtomicInteger lastPrunedGeneration = new AtomicInteger(0);
private final ScheduledExecutorService sizeMaintenanceTimer;
private ExecutorService asyncGetter;
private final Workers workers;
private final Runnable maintenanceTask;
private final ConnectionPoolListener connectionPoolListener;
private final ServerId serverId;
Expand All @@ -107,6 +107,7 @@ class DefaultConnectionPool implements ConnectionPool {
sizeMaintenanceTimer = createMaintenanceTimer();
connectionPoolCreated(connectionPoolListener, serverId, settings);
openConcurrencyLimiter = new OpenConcurrencyLimiter(MAX_CONNECTING);
workers = new Workers();
}

@Override
Expand Down Expand Up @@ -166,9 +167,9 @@ public void getAsync(final SingleResultCallback<InternalConnection> callback) {
}

if (immediateConnection != null) {
openAsync(immediateConnection, timeout, getAsyncGetter(), eventSendingCallback);
openAsync(immediateConnection, timeout, eventSendingCallback);
} else {
getAsyncGetter().execute(() -> {
workers.getter().execute(() -> {
if (timeout.expired()) {
eventSendingCallback.onResult(null, createTimeoutException(timeout));
return;
Expand All @@ -180,7 +181,7 @@ public void getAsync(final SingleResultCallback<InternalConnection> callback) {
eventSendingCallback.onResult(null, e);
return;
}
openAsync(connection, timeout, SAME_THREAD_EXECUTOR, eventSendingCallback);
openAsync(connection, timeout, eventSendingCallback);
});
}
}
Expand All @@ -205,7 +206,7 @@ private Throwable checkOutFailed(final Throwable t) {
return result;
}

private void openAsync(final PooledConnection pooledConnection, final Timeout timeout, final Executor executor,
private void openAsync(final PooledConnection pooledConnection, final Timeout timeout,
final SingleResultCallback<InternalConnection> callback) {
if (pooledConnection.opened()) {
if (LOGGER.isTraceEnabled()) {
Expand All @@ -218,20 +219,7 @@ private void openAsync(final PooledConnection pooledConnection, final Timeout ti
LOGGER.trace(format("Pooled connection %s to server %s is not yet open",
getId(pooledConnection), serverId));
}
executor.execute(() -> openConcurrencyLimiter.openAsyncOrGetAvailable(pooledConnection, timeout, callback));
}
}

private synchronized ExecutorService getAsyncGetter() {
if (asyncGetter == null) {
asyncGetter = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncGetter"));
}
return asyncGetter;
}

private synchronized void shutdownAsyncGetter() {
if (asyncGetter != null) {
asyncGetter.shutdownNow();
workers.opener().execute(() -> openConcurrencyLimiter.openAsyncOrGetAvailable(pooledConnection, timeout, callback));
}
}

Expand All @@ -249,7 +237,7 @@ public void close() {
if (sizeMaintenanceTimer != null) {
sizeMaintenanceTimer.shutdownNow();
}
shutdownAsyncGetter();
workers.close();
closed = true;
connectionPoolListener.connectionPoolClosed(new ConnectionPoolClosedEvent(serverId));
}
Expand Down Expand Up @@ -710,7 +698,7 @@ public Prune shouldPrune(final UsageTrackingInternalConnection usageTrackingConn
}

/**
* Package-private methods are thread-safe,
* Package-access methods are thread-safe,
* and only they should be called outside of the {@link OpenConcurrencyLimiter}'s code.
*/
@ThreadSafe
Expand Down Expand Up @@ -745,10 +733,10 @@ void openImmediately(final PooledConnection connection) throws MongoTimeoutExcep
* one becomes available while waiting for a permit.
* The first phase has one of the following outcomes:
* <ol>
* <li>A {@link MongoTimeoutException} or a different {@link Exception} is thrown.</li>
* <li>A {@link MongoTimeoutException} or a different {@link Exception} is thrown,
* and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}.</li>
* <li>An opened connection different from the specified one is returned,
* and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}.
* </li>
* and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}.</li>
* <li>A permit is acquired, {@link #connectionCreated(ConnectionPoolListener, ConnectionId)} is reported
* and an attempt to open the specified {@code connection} is made. This is the second phase in which
* the {@code connection} is {@linkplain PooledConnection#open() opened synchronously}.
Expand Down Expand Up @@ -838,6 +826,7 @@ void openAsyncOrGetAvailable(
private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boolean tryGetAvailable, final Timeout timeout)
throws MongoTimeoutException {
PooledConnection availableConnection = null;
boolean expressedDesireToGetAvailableConnection = false;
tryLock(timeout);
try {
if (tryGetAvailable) {
Expand All @@ -855,6 +844,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
return availableConnection;
}
expressDesireToGetAvailableConnection();
expressedDesireToGetAvailableConnection = true;
}
long remainingNanos = timeout.remainingOrInfinite(NANOSECONDS);
while (permits == 0) {
Expand All @@ -865,7 +855,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
if (Timeout.expired(remainingNanos)) {
throw createTimeoutException(timeout);
}
remainingNanos = awaitNanos(remainingNanos);
remainingNanos = awaitNanos(condition, remainingNanos);
}
if (availableConnection == null) {
assertTrue(permits > 0);
Expand All @@ -874,7 +864,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
return availableConnection;
} finally {
try {
if (tryGetAvailable && availableConnection == null) {//the desired connection slot has not yet been removed
if (expressedDesireToGetAvailableConnection && availableConnection == null) {
giveUpOnTryingToGetAvailableConnection();
}
} finally {
Expand Down Expand Up @@ -968,17 +958,15 @@ private void tryLock(final Timeout timeout) throws MongoTimeoutException {
}

/**
* Returns {@code timeoutNanos} if {@code timeoutNanos} is negative, otherwise returns 0 or a positive value.
*
* @param timeoutNanos Use a negative value for an infinite timeout,
* in which case {@link Condition#awaitNanos(long)} is called with {@link Long#MAX_VALUE}.
* @param timeoutNanos See {@link Timeout#startNow(long)}.
* @return The remaining duration as per {@link Timeout#remainingOrInfinite(TimeUnit)} if waiting ended early either
* spuriously or because of receiving a signal.
*/
private long awaitNanos(final long timeoutNanos) {
private long awaitNanos(final Condition condition, final long timeoutNanos) throws MongoInterruptedException {
try {
if (timeoutNanos < 0) {
//noinspection ResultOfMethodCallIgnored
condition.awaitNanos(Long.MAX_VALUE);
return timeoutNanos;
if (timeoutNanos < 0 || timeoutNanos == Long.MAX_VALUE) {
condition.await();
return -1;
} else {
return Math.max(0, condition.awaitNanos(timeoutNanos));
}
Expand All @@ -997,4 +985,56 @@ private static final class MutableReference<T> {
private MutableReference() {
}
}

@ThreadSafe
private static class Workers implements AutoCloseable {
private volatile ExecutorService getter;
private volatile ExecutorService opener;
private final Lock lock;

Workers() {
lock = new StampedLock().asWriteLock();
}

Executor getter() {
if (getter == null) {
lock.lock();
try {
if (getter == null) {
getter = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncGetter"));
}
} finally {
lock.unlock();
}
}
return getter;
}

Executor opener() {
if (opener == null) {
lock.lock();
try {
if (opener == null) {
opener = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncOpener"));
}
} finally {
lock.unlock();
}
}
return opener;
}

@Override
public void close() {
try {
if (getter != null) {
getter.shutdownNow();
}
} finally {
if (opener != null) {
opener.shutdownNow();
}
}
}
}
}

0 comments on commit 73e3085

Please sign in to comment.