From 058d9f4af7c519f34612bd4e4a23c52af08fbf7f Mon Sep 17 00:00:00 2001 From: Jeff Yemin Date: Thu, 10 Feb 2022 16:16:55 -0500 Subject: [PATCH] Make reference count check atomic with release (#876) JAVA-4490 Co-authored-by: Valentin Kovalenko --- .../async/client/ClientSessionBinding.java | 38 +++++++++---------- .../binding/AbstractReferenceCounted.java | 6 ++- .../AsyncClusterAwareReadWriteBinding.java | 3 ++ .../internal/binding/AsyncClusterBinding.java | 7 ++-- .../internal/binding/ClusterBinding.java | 5 ++- .../internal/binding/ReferenceCounted.java | 12 +++++- .../internal/binding/SingleServerBinding.java | 7 ++-- .../internal/binding/TransactionContext.java | 7 ++-- .../connection/AbstractReferenceCounted.java | 6 ++- .../internal/connection/DefaultServer.java | 14 ++++--- .../connection/DefaultServerConnection.java | 9 ++--- .../client/syncadapter/SyncConnection.java | 4 +- .../internal/binding/AsyncSessionBinding.java | 8 ++-- .../binding/AsyncSingleConnectionBinding.java | 14 ++++--- .../internal/binding/SessionBinding.java | 8 ++-- .../binding/SingleConnectionBinding.java | 8 ++-- .../ClientSessionBindingSpecification.groovy | 1 + .../internal/connection/TestConnection.java | 3 +- .../AsyncQueryBatchCursorSpecification.groovy | 2 + .../QueryBatchCursorSpecification.groovy | 2 + .../client/internal/crypt/CryptBinding.java | 11 +++--- .../internal/crypt/CryptConnection.java | 4 +- .../client/internal/ClientSessionBinding.java | 33 +++++++++------- .../mongodb/client/internal/CryptBinding.java | 8 ++-- .../client/internal/CryptConnection.java | 4 +- 25 files changed, 131 insertions(+), 93 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/client/ClientSessionBinding.java b/driver-core/src/main/com/mongodb/internal/async/client/ClientSessionBinding.java index 86abe6d653e..4e0122775c0 100644 --- a/driver-core/src/main/com/mongodb/internal/async/client/ClientSessionBinding.java +++ b/driver-core/src/main/com/mongodb/internal/async/client/ClientSessionBinding.java @@ -23,6 +23,7 @@ import com.mongodb.connection.ClusterType; import com.mongodb.connection.ServerDescription; import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.binding.AbstractReferenceCounted; import com.mongodb.internal.binding.AsyncClusterAwareReadWriteBinding; import com.mongodb.internal.binding.AsyncConnectionSource; import com.mongodb.internal.binding.AsyncReadWriteBinding; @@ -36,7 +37,7 @@ import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.connection.ClusterType.LOAD_BALANCED; -public class ClientSessionBinding implements AsyncReadWriteBinding { +public class ClientSessionBinding extends AbstractReferenceCounted implements AsyncReadWriteBinding { private final AsyncClusterAwareReadWriteBinding wrapped; private final AsyncClientSession session; private final boolean ownsSession; @@ -44,7 +45,7 @@ public class ClientSessionBinding implements AsyncReadWriteBinding { public ClientSessionBinding(final AsyncClientSession session, final boolean ownsSession, final AsyncClusterAwareReadWriteBinding wrapped) { - this.wrapped = notNull("wrapped", (wrapped)); + this.wrapped = notNull("wrapped", wrapped).retain(); this.ownsSession = ownsSession; this.session = notNull("session", session); this.sessionContext = new AsyncClientSessionContext(session); @@ -113,14 +114,9 @@ private void getPinnedConnectionSource(final boolean isRead, final SingleResultC } } - @Override - public int getCount() { - return wrapped.getCount(); - } - @Override public AsyncReadWriteBinding retain() { - wrapped.retain(); + super.retain(); return this; } @@ -131,15 +127,15 @@ public void getReadConnectionSource(final int minWireVersion, final ReadPreferen } @Override - public void release() { - wrapped.release(); - closeSessionIfCountIsZero(); - } - - private void closeSessionIfCountIsZero() { - if (getCount() == 0 && ownsSession) { - session.close(); + public int release() { + int count = super.release(); + if (count == 0) { + wrapped.release(); + if (ownsSession) { + session.close(); + } } + return count; } private boolean isConnectionSourcePinningRequired() { @@ -152,6 +148,7 @@ private class SessionBindingAsyncConnectionSource implements AsyncConnectionSour SessionBindingAsyncConnectionSource(final AsyncConnectionSource wrapped) { this.wrapped = wrapped; + ClientSessionBinding.this.retain(); } @Override @@ -214,9 +211,12 @@ public int getCount() { } @Override - public void release() { - wrapped.release(); - closeSessionIfCountIsZero(); + public int release() { + int count = wrapped.release(); + if (count == 0) { + ClientSessionBinding.this.release(); + } + return count; } } diff --git a/driver-core/src/main/com/mongodb/internal/binding/AbstractReferenceCounted.java b/driver-core/src/main/com/mongodb/internal/binding/AbstractReferenceCounted.java index ea7c30c2864..0b1c0a419d9 100644 --- a/driver-core/src/main/com/mongodb/internal/binding/AbstractReferenceCounted.java +++ b/driver-core/src/main/com/mongodb/internal/binding/AbstractReferenceCounted.java @@ -34,9 +34,11 @@ public ReferenceCounted retain() { } @Override - public void release() { - if (referenceCount.decrementAndGet() < 0) { + public int release() { + int decrementedValue = referenceCount.decrementAndGet(); + if (decrementedValue < 0) { throw new IllegalStateException("Attempted to decrement the reference count below 0"); } + return decrementedValue; } } diff --git a/driver-core/src/main/com/mongodb/internal/binding/AsyncClusterAwareReadWriteBinding.java b/driver-core/src/main/com/mongodb/internal/binding/AsyncClusterAwareReadWriteBinding.java index a0eb42bba92..ede070bf621 100644 --- a/driver-core/src/main/com/mongodb/internal/binding/AsyncClusterAwareReadWriteBinding.java +++ b/driver-core/src/main/com/mongodb/internal/binding/AsyncClusterAwareReadWriteBinding.java @@ -33,4 +33,7 @@ public interface AsyncClusterAwareReadWriteBinding extends AsyncReadWriteBinding * @param callback the to be passed the connection source */ void getConnectionSource(ServerAddress serverAddress, SingleResultCallback callback); + + @Override + AsyncClusterAwareReadWriteBinding retain(); } diff --git a/driver-core/src/main/com/mongodb/internal/binding/AsyncClusterBinding.java b/driver-core/src/main/com/mongodb/internal/binding/AsyncClusterBinding.java index 3bc479b1d62..46ae76e12fb 100644 --- a/driver-core/src/main/com/mongodb/internal/binding/AsyncClusterBinding.java +++ b/driver-core/src/main/com/mongodb/internal/binding/AsyncClusterBinding.java @@ -73,7 +73,7 @@ public AsyncClusterBinding(final Cluster cluster, final ReadPreference readPrefe } @Override - public AsyncReadWriteBinding retain() { + public AsyncClusterAwareReadWriteBinding retain() { super.retain(); return this; } @@ -208,9 +208,10 @@ public AsyncConnectionSource retain() { } @Override - public void release() { - super.release(); + public int release() { + int count = super.release(); AsyncClusterBinding.this.release(); + return count; } } } diff --git a/driver-core/src/main/com/mongodb/internal/binding/ClusterBinding.java b/driver-core/src/main/com/mongodb/internal/binding/ClusterBinding.java index cee16e15ab5..552778b9ffe 100644 --- a/driver-core/src/main/com/mongodb/internal/binding/ClusterBinding.java +++ b/driver-core/src/main/com/mongodb/internal/binding/ClusterBinding.java @@ -182,9 +182,10 @@ public ConnectionSource retain() { } @Override - public void release() { - super.release(); + public int release() { + int count = super.release(); ClusterBinding.this.release(); + return count; } } } diff --git a/driver-core/src/main/com/mongodb/internal/binding/ReferenceCounted.java b/driver-core/src/main/com/mongodb/internal/binding/ReferenceCounted.java index b7ea86ea8e7..27d9df9a695 100644 --- a/driver-core/src/main/com/mongodb/internal/binding/ReferenceCounted.java +++ b/driver-core/src/main/com/mongodb/internal/binding/ReferenceCounted.java @@ -16,6 +16,10 @@ package com.mongodb.internal.binding; +import com.mongodb.internal.VisibleForTesting; + +import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; + /** * An interface for reference-counted objects. *

@@ -39,9 +43,14 @@ public interface ReferenceCounted { /** * Gets the current reference count. * + *

+ * This method should only be used for testing. Production code should prefer using the count returned from {@link #release()} + *

+ * * @return the current count, which must be greater than or equal to 0. * Returns 1 for a newly created object. */ + @VisibleForTesting(otherwise = PRIVATE) int getCount(); /** @@ -54,6 +63,7 @@ public interface ReferenceCounted { /** * Release a reference to this object. * @throws java.lang.IllegalStateException if the reference count is already 0 + * @return the reference count after the release */ - void release(); + int release(); } diff --git a/driver-core/src/main/com/mongodb/internal/binding/SingleServerBinding.java b/driver-core/src/main/com/mongodb/internal/binding/SingleServerBinding.java index 17634b727bf..d572cda8aab 100644 --- a/driver-core/src/main/com/mongodb/internal/binding/SingleServerBinding.java +++ b/driver-core/src/main/com/mongodb/internal/binding/SingleServerBinding.java @@ -146,11 +146,12 @@ public ConnectionSource retain() { } @Override - public void release() { - super.release(); - if (super.getCount() == 0) { + public int release() { + int count = super.release(); + if (count == 0) { SingleServerBinding.this.release(); } + return count; } } } diff --git a/driver-core/src/main/com/mongodb/internal/binding/TransactionContext.java b/driver-core/src/main/com/mongodb/internal/binding/TransactionContext.java index 1c4e327e1b2..6c96f1f6c5a 100644 --- a/driver-core/src/main/com/mongodb/internal/binding/TransactionContext.java +++ b/driver-core/src/main/com/mongodb/internal/binding/TransactionContext.java @@ -48,13 +48,14 @@ public boolean isConnectionPinningRequired() { } @Override - public void release() { - super.release(); - if (getCount() == 0) { + public int release() { + int count = super.release(); + if (count == 0) { if (pinnedConnection != null) { pinnedConnection.release(); } } + return count; } @SuppressWarnings("unchecked") diff --git a/driver-core/src/main/com/mongodb/internal/connection/AbstractReferenceCounted.java b/driver-core/src/main/com/mongodb/internal/connection/AbstractReferenceCounted.java index b30fa93846a..a2411f410c7 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AbstractReferenceCounted.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AbstractReferenceCounted.java @@ -36,9 +36,11 @@ public ReferenceCounted retain() { } @Override - public void release() { - if (referenceCount.decrementAndGet() < 0) { + public int release() { + int decrementedValue = referenceCount.decrementAndGet(); + if (decrementedValue < 0) { throw new IllegalStateException("Attempted to decrement the reference count below 0"); } + return decrementedValue; } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java index 28fa2997821..2efdaff7a10 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java @@ -301,11 +301,12 @@ public int getCount() { } @Override - public void release() { - wrapped.release(); - if (getCount() == 0) { + public int release() { + int count = wrapped.release(); + if (count == 0) { server.operationEnd(); } + return count; } @Override @@ -401,11 +402,12 @@ public int getCount() { } @Override - public void release() { - wrapped.release(); - if (getCount() == 0) { + public int release() { + int count = wrapped.release(); + if (count == 0) { server.operationEnd(); } + return count; } @Override diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerConnection.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerConnection.java index 267b8ece6aa..876780b92ef 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerConnection.java @@ -37,7 +37,6 @@ import java.util.List; -import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.connection.ServerType.SHARD_ROUTER; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; @@ -61,16 +60,16 @@ public DefaultServerConnection retain() { } @Override - public void release() { - super.release(); - if (getCount() == 0) { + public int release() { + int count = super.release(); + if (count == 0) { wrapped.close(); } + return count; } @Override public ConnectionDescription getDescription() { - isTrue("open", getCount() > 0); return wrapped.getDescription(); } diff --git a/driver-core/src/test/functional/com/mongodb/client/syncadapter/SyncConnection.java b/driver-core/src/test/functional/com/mongodb/client/syncadapter/SyncConnection.java index 52bb37bf00a..9968cfc20f5 100644 --- a/driver-core/src/test/functional/com/mongodb/client/syncadapter/SyncConnection.java +++ b/driver-core/src/test/functional/com/mongodb/client/syncadapter/SyncConnection.java @@ -48,8 +48,8 @@ public int getCount() { } @Override - public void release() { - wrapped.release(); + public int release() { + return wrapped.release(); } @Override diff --git a/driver-core/src/test/functional/com/mongodb/internal/binding/AsyncSessionBinding.java b/driver-core/src/test/functional/com/mongodb/internal/binding/AsyncSessionBinding.java index 3bcbb2c4774..5a81ab87f56 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/binding/AsyncSessionBinding.java +++ b/driver-core/src/test/functional/com/mongodb/internal/binding/AsyncSessionBinding.java @@ -114,8 +114,8 @@ public AsyncReadWriteBinding retain() { } @Override - public void release() { - wrapped.release(); + public int release() { + return wrapped.release(); } private class SessionBindingAsyncConnectionSource implements AsyncConnectionSource { @@ -168,8 +168,8 @@ public AsyncConnectionSource retain() { } @Override - public void release() { - wrapped.release(); + public int release() { + return wrapped.release(); } } diff --git a/driver-core/src/test/functional/com/mongodb/internal/binding/AsyncSingleConnectionBinding.java b/driver-core/src/test/functional/com/mongodb/internal/binding/AsyncSingleConnectionBinding.java index 0d022c1bf38..7eaf062d6c1 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/binding/AsyncSingleConnectionBinding.java +++ b/driver-core/src/test/functional/com/mongodb/internal/binding/AsyncSingleConnectionBinding.java @@ -202,12 +202,13 @@ public void getWriteConnectionSource(final SingleResultCallback> wrappedBinding def binding = new ClientSessionBinding(session, false, wrappedBinding) when: diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnection.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnection.java index 85942d3b5ae..9497cff75f1 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnection.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnection.java @@ -56,7 +56,8 @@ public TestConnection retain() { } @Override - public void release() { + public int release() { + return 1; } @Override diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncQueryBatchCursorSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncQueryBatchCursorSpecification.groovy index 9201d960d67..1270cc17326 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncQueryBatchCursorSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncQueryBatchCursorSpecification.groovy @@ -624,6 +624,7 @@ class AsyncQueryBatchCursorSpecification extends Specification { } else if (counter < 0) { throw new IllegalStateException('Tried to release AsyncConnection below 0') } + counter } mock.getCount() >> { counter } mock @@ -671,6 +672,7 @@ class AsyncQueryBatchCursorSpecification extends Specification { } else if (counter < 0) { throw new IllegalStateException('Tried to release AsyncConnectionSource below 0') } + counter } mock.getCount() >> { counter } mock diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/QueryBatchCursorSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/operation/QueryBatchCursorSpecification.groovy index 571ec5abcf2..e923349d55c 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/operation/QueryBatchCursorSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/QueryBatchCursorSpecification.groovy @@ -299,6 +299,7 @@ class QueryBatchCursorSpecification extends Specification { if (refCounter < 0) { throw new IllegalStateException('Tried to release Connection below 0') } + refCounter; } mockConn.getCount() >> { refCounter } mockConn @@ -328,6 +329,7 @@ class QueryBatchCursorSpecification extends Specification { if (refCounter < 0) { throw new IllegalStateException('Tried to release ConnectionSource below 0') } + refCounter } mockConnectionSource.getCount() >> { refCounter } mockConnectionSource.getConnection() >> { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/CryptBinding.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/CryptBinding.java index f681abd6ef4..cf67fd753b1 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/CryptBinding.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/CryptBinding.java @@ -24,7 +24,6 @@ import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.binding.AsyncClusterAwareReadWriteBinding; import com.mongodb.internal.binding.AsyncConnectionSource; -import com.mongodb.internal.binding.AsyncReadWriteBinding; import com.mongodb.internal.connection.AsyncConnection; import com.mongodb.internal.connection.Cluster; import com.mongodb.internal.session.SessionContext; @@ -115,14 +114,14 @@ public int getCount() { } @Override - public AsyncReadWriteBinding retain() { + public AsyncClusterAwareReadWriteBinding retain() { wrapped.retain(); return this; } @Override - public void release() { - wrapped.release(); + public int release() { + return wrapped.release(); } @Override @@ -187,8 +186,8 @@ public AsyncConnectionSource retain() { } @Override - public void release() { - wrapped.release(); + public int release() { + return wrapped.release(); } } } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/CryptConnection.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/CryptConnection.java index 25c9e41aff1..16606532d15 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/CryptConnection.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/CryptConnection.java @@ -87,8 +87,8 @@ public CryptConnection retain() { } @Override - public void release() { - wrapped.release(); + public int release() { + return wrapped.release(); } @Override diff --git a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionBinding.java b/driver-sync/src/main/com/mongodb/client/internal/ClientSessionBinding.java index d96f9572b85..f12e0a5a94f 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionBinding.java +++ b/driver-sync/src/main/com/mongodb/client/internal/ClientSessionBinding.java @@ -23,6 +23,7 @@ import com.mongodb.client.ClientSession; import com.mongodb.connection.ClusterType; import com.mongodb.connection.ServerDescription; +import com.mongodb.internal.binding.AbstractReferenceCounted; import com.mongodb.internal.binding.ClusterAwareReadWriteBinding; import com.mongodb.internal.binding.ConnectionSource; import com.mongodb.internal.binding.ReadWriteBinding; @@ -38,7 +39,7 @@ /** * This class is not part of the public API and may be removed or changed at any time. */ -public class ClientSessionBinding implements ReadWriteBinding { +public class ClientSessionBinding extends AbstractReferenceCounted implements ReadWriteBinding { private final ClusterAwareReadWriteBinding wrapped; private final ClientSession session; private final boolean ownsSession; @@ -46,6 +47,7 @@ public class ClientSessionBinding implements ReadWriteBinding { public ClientSessionBinding(final ClientSession session, final boolean ownsSession, final ClusterAwareReadWriteBinding wrapped) { this.wrapped = wrapped; + wrapped.retain(); this.session = notNull("session", session); this.ownsSession = ownsSession; this.sessionContext = new SyncClientSessionContext(session); @@ -62,21 +64,22 @@ public int getCount() { } @Override - public ReadWriteBinding retain() { - wrapped.retain(); + public ClientSessionBinding retain() { + super.retain(); return this; } @Override - public void release() { - wrapped.release(); - closeSessionIfCountIsZero(); - } + public int release() { + int count = super.release(); + if (count == 0) { + wrapped.release(); + if (ownsSession) { + session.close(); + } - private void closeSessionIfCountIsZero() { - if (getCount() == 0 && ownsSession) { - session.close(); } + return count; } @Override @@ -145,6 +148,7 @@ private class SessionBindingConnectionSource implements ConnectionSource { SessionBindingConnectionSource(final ConnectionSource wrapped) { this.wrapped = wrapped; + ClientSessionBinding.this.retain(); } @Override @@ -202,9 +206,12 @@ public int getCount() { } @Override - public void release() { - wrapped.release(); - closeSessionIfCountIsZero(); + public int release() { + int count = wrapped.release(); + if (count == 0) { + ClientSessionBinding.this.release(); + } + return count; } } diff --git a/driver-sync/src/main/com/mongodb/client/internal/CryptBinding.java b/driver-sync/src/main/com/mongodb/client/internal/CryptBinding.java index 051a9b62aba..28a978698b2 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/CryptBinding.java +++ b/driver-sync/src/main/com/mongodb/client/internal/CryptBinding.java @@ -91,8 +91,8 @@ public ReadWriteBinding retain() { } @Override - public void release() { - wrapped.release(); + public int release() { + return wrapped.release(); } @Override @@ -149,8 +149,8 @@ public ConnectionSource retain() { } @Override - public void release() { - wrapped.release(); + public int release() { + return wrapped.release(); } } } diff --git a/driver-sync/src/main/com/mongodb/client/internal/CryptConnection.java b/driver-sync/src/main/com/mongodb/client/internal/CryptConnection.java index 04ffaf822c2..9af3f84a57f 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/CryptConnection.java +++ b/driver-sync/src/main/com/mongodb/client/internal/CryptConnection.java @@ -84,8 +84,8 @@ public CryptConnection retain() { } @Override - public void release() { - wrapped.release(); + public int release() { + return wrapped.release(); } @Override