Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock and couple more problems in DefaultConnectionPool #699

Merged
merged 4 commits into from May 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/findbugs-exclude.xml
Expand Up @@ -190,10 +190,10 @@
<Bug pattern="VA_FORMAT_STRING_USES_NEWLINE"/>
</Match>

<!-- The return value of Condition.awaitNanos is ignored on purpose for infinite timeouts. -->
<!-- The method is a wrapper for `Condition.await`, naturally it does not call it in a loop. -->
<Match>
<Class name="com.mongodb.internal.connection.DefaultConnectionPool$OpenConcurrencyLimiter"/>
<Method name="awaitNanos"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
<Bug pattern="WA_AWAIT_NOT_IN_LOOP"/>
</Match>
</FindBugsFilter>
Expand Up @@ -150,7 +150,7 @@ public T get(final long timeout, final TimeUnit timeUnit) {

T t = available.pollLast();
if (t == null) {
t = createNewAndReleasePermitIfFailure(noInit -> {});
t = createNewAndReleasePermitIfFailure();
}

return t;
Expand Down Expand Up @@ -197,28 +197,27 @@ public void prune() {
* The {@code postCreate} action throwing a exception causes this method to stop and re-throw that exception.
*
* @param initialize An action applied to non-{@code null} new items.
* If this action throws an {@link Exception}, it must release resources associated with the item.
* If an exception is thrown by the action, the action must treat the provided item as if obtained via
* a {@link #get(long, TimeUnit) get…} method, {@linkplain #release(Object, boolean) releasing} it
* if an exception is thrown; otherwise the action must not release the item.
*/
public void ensureMinSize(final int minSize, final Consumer<T> initialize) {
while (getCount() < minSize) {
if (!acquirePermit(0, TimeUnit.MILLISECONDS)) {
break;
}
T newItem = createNewAndReleasePermitIfFailure(initialize);
T newItem = createNewAndReleasePermitIfFailure();
initialize.accept(newItem);
release(newItem);
rozza marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* @param initialize See {@link #ensureMinSize(int, Consumer)}.
*/
private T createNewAndReleasePermitIfFailure(final Consumer<T> initialize) {
private T createNewAndReleasePermitIfFailure() {
try {
T newMember = itemFactory.create();
if (newMember == null) {
throw new MongoInternalException("The factory for the pool created a null item");
}
initialize.accept(newMember);
return newMember;
} catch (RuntimeException e) {
permits.release();
Expand Down
Expand Up @@ -62,6 +62,7 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.StampedLock;

import static com.mongodb.assertions.Assertions.assertFalse;
import static com.mongodb.assertions.Assertions.assertNotNull;
Expand All @@ -77,7 +78,6 @@
@SuppressWarnings("deprecation")
class DefaultConnectionPool implements ConnectionPool {
private static final Logger LOGGER = Loggers.getLogger("connection");
private static final Executor SAME_THREAD_EXECUTOR = Runnable::run;
/**
* Is package-access for the purpose of testing and must not be used for any other purpose outside of this class.
*/
Expand All @@ -88,7 +88,7 @@ class DefaultConnectionPool implements ConnectionPool {
private final AtomicInteger generation = new AtomicInteger(0);
private final AtomicInteger lastPrunedGeneration = new AtomicInteger(0);
private final ScheduledExecutorService sizeMaintenanceTimer;
private ExecutorService asyncGetter;
private final Workers workers;
private final Runnable maintenanceTask;
private final ConnectionPoolListener connectionPoolListener;
private final ServerId serverId;
Expand All @@ -107,6 +107,7 @@ class DefaultConnectionPool implements ConnectionPool {
sizeMaintenanceTimer = createMaintenanceTimer();
connectionPoolCreated(connectionPoolListener, serverId, settings);
openConcurrencyLimiter = new OpenConcurrencyLimiter(MAX_CONNECTING);
workers = new Workers();
}

@Override
Expand Down Expand Up @@ -166,9 +167,9 @@ public void getAsync(final SingleResultCallback<InternalConnection> callback) {
}

if (immediateConnection != null) {
openAsync(immediateConnection, timeout, getAsyncGetter(), eventSendingCallback);
openAsync(immediateConnection, timeout, eventSendingCallback);
} else {
getAsyncGetter().execute(() -> {
workers.getter().execute(() -> {
if (timeout.expired()) {
eventSendingCallback.onResult(null, createTimeoutException(timeout));
return;
Expand All @@ -180,7 +181,7 @@ public void getAsync(final SingleResultCallback<InternalConnection> callback) {
eventSendingCallback.onResult(null, e);
return;
}
openAsync(connection, timeout, SAME_THREAD_EXECUTOR, eventSendingCallback);
openAsync(connection, timeout, eventSendingCallback);
});
}
}
Expand All @@ -205,7 +206,7 @@ private Throwable checkOutFailed(final Throwable t) {
return result;
}

private void openAsync(final PooledConnection pooledConnection, final Timeout timeout, final Executor executor,
private void openAsync(final PooledConnection pooledConnection, final Timeout timeout,
final SingleResultCallback<InternalConnection> callback) {
if (pooledConnection.opened()) {
if (LOGGER.isTraceEnabled()) {
Expand All @@ -218,20 +219,7 @@ private void openAsync(final PooledConnection pooledConnection, final Timeout ti
LOGGER.trace(format("Pooled connection %s to server %s is not yet open",
getId(pooledConnection), serverId));
}
executor.execute(() -> openConcurrencyLimiter.openAsyncOrGetAvailable(pooledConnection, timeout, callback));
}
}

private synchronized ExecutorService getAsyncGetter() {
if (asyncGetter == null) {
asyncGetter = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncGetter"));
}
return asyncGetter;
}

private synchronized void shutdownAsyncGetter() {
if (asyncGetter != null) {
asyncGetter.shutdownNow();
workers.opener().execute(() -> openConcurrencyLimiter.openAsyncOrGetAvailable(pooledConnection, timeout, callback));
}
}

Expand All @@ -249,7 +237,7 @@ public void close() {
if (sizeMaintenanceTimer != null) {
sizeMaintenanceTimer.shutdownNow();
}
shutdownAsyncGetter();
workers.close();
closed = true;
connectionPoolListener.connectionPoolClosed(new ConnectionPoolClosedEvent(serverId));
}
Expand Down Expand Up @@ -710,7 +698,7 @@ public Prune shouldPrune(final UsageTrackingInternalConnection usageTrackingConn
}

/**
* Package-private methods are thread-safe,
* Package-access methods are thread-safe,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"package-private" is not a term the JLS uses, while "package-access" is.

* and only they should be called outside of the {@link OpenConcurrencyLimiter}'s code.
*/
@ThreadSafe
Expand Down Expand Up @@ -745,10 +733,10 @@ void openImmediately(final PooledConnection connection) throws MongoTimeoutExcep
* one becomes available while waiting for a permit.
* The first phase has one of the following outcomes:
* <ol>
* <li>A {@link MongoTimeoutException} or a different {@link Exception} is thrown.</li>
* <li>A {@link MongoTimeoutException} or a different {@link Exception} is thrown,
* and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}.</li>
* <li>An opened connection different from the specified one is returned,
* and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}.
* </li>
* and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}.</li>
* <li>A permit is acquired, {@link #connectionCreated(ConnectionPoolListener, ConnectionId)} is reported
* and an attempt to open the specified {@code connection} is made. This is the second phase in which
* the {@code connection} is {@linkplain PooledConnection#open() opened synchronously}.
Expand Down Expand Up @@ -838,6 +826,7 @@ void openAsyncOrGetAvailable(
private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boolean tryGetAvailable, final Timeout timeout)
throws MongoTimeoutException {
PooledConnection availableConnection = null;
boolean expressedDesireToGetAvailableConnection = false;
tryLock(timeout);
try {
if (tryGetAvailable) {
Expand All @@ -855,6 +844,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
return availableConnection;
}
expressDesireToGetAvailableConnection();
expressedDesireToGetAvailableConnection = true;
}
long remainingNanos = timeout.remainingOrInfinite(NANOSECONDS);
while (permits == 0) {
Expand All @@ -865,7 +855,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
if (Timeout.expired(remainingNanos)) {
throw createTimeoutException(timeout);
}
remainingNanos = awaitNanos(remainingNanos);
remainingNanos = awaitNanos(condition, remainingNanos);
}
if (availableConnection == null) {
assertTrue(permits > 0);
Expand All @@ -874,7 +864,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
return availableConnection;
} finally {
try {
if (tryGetAvailable && availableConnection == null) {//the desired connection slot has not yet been removed
if (expressedDesireToGetAvailableConnection && availableConnection == null) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes the second problem I discovered while implementing pausable pool. getPooledConnectionImmediately there may throw an exception, which in turn may result in this code calling giveUpOnTryingToGetAvailableConnection despite expressDesireToGetAvailableConnection not being called.

This is an example of how one should try not to rely if possible on methods not throwing exceptions.

giveUpOnTryingToGetAvailableConnection();
}
} finally {
Expand Down Expand Up @@ -968,17 +958,15 @@ private void tryLock(final Timeout timeout) throws MongoTimeoutException {
}

/**
* Returns {@code timeoutNanos} if {@code timeoutNanos} is negative, otherwise returns 0 or a positive value.
*
* @param timeoutNanos Use a negative value for an infinite timeout,
* in which case {@link Condition#awaitNanos(long)} is called with {@link Long#MAX_VALUE}.
* @param timeoutNanos See {@link Timeout#startNow(long)}.
* @return The remaining duration as per {@link Timeout#remainingOrInfinite(TimeUnit)} if waiting ended early either
* spuriously or because of receiving a signal.
*/
private long awaitNanos(final long timeoutNanos) {
private long awaitNanos(final Condition condition, final long timeoutNanos) throws MongoInterruptedException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes the behavior of this method consistent with Timeout and allows to see explicitly what condition it waits for on the calling side.

try {
if (timeoutNanos < 0) {
//noinspection ResultOfMethodCallIgnored
condition.awaitNanos(Long.MAX_VALUE);
return timeoutNanos;
if (timeoutNanos < 0 || timeoutNanos == Long.MAX_VALUE) {
condition.await();
return -1;
jyemin marked this conversation as resolved.
Show resolved Hide resolved
} else {
return Math.max(0, condition.awaitNanos(timeoutNanos));
}
Expand All @@ -997,4 +985,56 @@ private static final class MutableReference<T> {
private MutableReference() {
}
}

@ThreadSafe
private static class Workers implements AutoCloseable {
private volatile ExecutorService getter;
private volatile ExecutorService opener;
private final Lock lock;

Workers() {
lock = new StampedLock().asWriteLock();
}

Executor getter() {
if (getter == null) {
lock.lock();
try {
if (getter == null) {
getter = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncGetter"));
}
} finally {
lock.unlock();
}
}
return getter;
}

Executor opener() {
if (opener == null) {
lock.lock();
try {
if (opener == null) {
opener = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncOpener"));
}
} finally {
lock.unlock();
}
}
return opener;
}

@Override
public void close() {
try {
if (getter != null) {
getter.shutdownNow();
}
} finally {
if (opener != null) {
opener.shutdownNow();
}
}
}
}
}