From 8563d1e22351874c4f796808b19fe5d49efac17f Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 24 Mar 2026 11:07:06 -0700 Subject: [PATCH 1/6] Release connection between empty getMore responses in AsyncCommandCursor - Extract getMoreLoop to loop on empty batches while releasing the connection between each getMore, preventing pool starvation from idle tailable cursors (e.g., change streams) - Add test to verify connection is released and re-acquired between consecutive empty getMores JAVA-6142 --- .../operation/AsyncCommandCursor.java | 36 ++++++++++--------- .../operation/AsyncCommandCursorTest.java | 33 ++++++++++++++++- 2 files changed, 52 insertions(+), 17 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java index 91286bd520b..3037d10b74b 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java @@ -108,11 +108,27 @@ public void next(final OperationContext operationContext, final SingleResultCall commandCursorResult = withEmptyResults(commandCursorResult); funcCallback.onResult(batchResults, null); } else { - getMore(localServerCursor, operationContext, funcCallback); + getMoreLoop(localServerCursor, operationContext, funcCallback); } }, operationContext, callback); } + private void getMoreLoop(final ServerCursor localServerCursor, final OperationContext operationContext, + final SingleResultCallback> funcCallback) { + getMore(localServerCursor, operationContext, (nextBatch, t) -> { + if (t != null) { + funcCallback.onResult(null, t); + } else if (resourceManager.getServerCursor() == null || (nextBatch != null && !nextBatch.isEmpty())) { + commandCursorResult = withEmptyResults(commandCursorResult); + funcCallback.onResult(nextBatch, null); + } else if (!resourceManager.operable()) { + funcCallback.onResult(emptyList(), null); + } else { + getMoreLoop(assertNotNull(resourceManager.getServerCursor()), operationContext, funcCallback); + } + }); + } + @Override public boolean isClosed() { return !resourceManager.operable(); @@ -164,10 +180,10 @@ public int getMaxWireVersion() { private void getMore(final ServerCursor cursor, final OperationContext operationContext, final SingleResultCallback> callback) { resourceManager.executeWithConnection(operationContext, (connection, wrappedCallback) -> - getMoreLoop(assertNotNull(connection), cursor, operationContext, wrappedCallback), callback); + getMoreCommand(assertNotNull(connection), cursor, operationContext, wrappedCallback), callback); } - private void getMoreLoop(final AsyncConnection connection, final ServerCursor serverCursor, + private void getMoreCommand(final AsyncConnection connection, final ServerCursor serverCursor, final OperationContext operationContext, final SingleResultCallback> callback) { connection.commandAsync(namespace.getDatabaseName(), @@ -188,19 +204,7 @@ private void getMoreLoop(final AsyncConnection connection, final ServerCursor se connection.getDescription().getServerAddress(), NEXT_BATCH, assertNotNull(commandResult)); ServerCursor nextServerCursor = commandCursorResult.getServerCursor(); resourceManager.setServerCursor(nextServerCursor); - List nextBatch = commandCursorResult.getResults(); - if (nextServerCursor == null || !nextBatch.isEmpty()) { - commandCursorResult = withEmptyResults(commandCursorResult); - callback.onResult(nextBatch, null); - return; - } - - if (!resourceManager.operable()) { - callback.onResult(emptyList(), null); - return; - } - - getMoreLoop(connection, nextServerCursor, operationContext, callback); + callback.onResult(commandCursorResult.getResults(), null); }); } diff --git a/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java b/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java index 464e817d606..b145fd585a3 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java @@ -46,7 +46,11 @@ import org.junit.jupiter.api.Test; import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import static com.mongodb.assertions.Assertions.assertNotNull; +import static com.mongodb.assertions.Assertions.assertNull; +import static com.mongodb.assertions.Assertions.assertTrue; import static com.mongodb.internal.operation.OperationUnitSpecification.getMaxWireVersionForServerVersion; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.mockito.ArgumentMatchers.any; @@ -105,7 +109,7 @@ void setUp() { serverDescription = mock(ServerDescription.class); when(operationContext.getTimeoutContext()).thenReturn(timeoutContext); doAnswer(invocation -> { - SingleResultCallback callback = invocation.getArgument(0); + SingleResultCallback callback = invocation.getArgument(1); callback.onResult(mockConnection, null); return null; }).when(connectionSource).getConnection(any(), any()); @@ -199,6 +203,33 @@ void shouldSkipKillsCursorsCommandWhenTimeoutExceptionHaveNetworkErrorCause() { } + @Test + void shouldReleaseConnectionBetweenEmptyGetMoreResponses() { + AtomicInteger callCount = new AtomicInteger(); + doAnswer(invocation -> { + SingleResultCallback cb = invocation.getArgument(6); + cb.onResult(new BsonDocument("cursor", + new BsonDocument("ns", new BsonString(NAMESPACE.getFullName())) + .append("id", new BsonInt64(callCount.incrementAndGet() < 3 ? 1 : 0)) + .append("nextBatch", new BsonArrayWrapper<>(new BsonArray()))), null); + return null; + }).when(mockConnection).commandAsync(eq(NAMESPACE.getDatabaseName()), + argThat(doc -> doc.containsKey("getMore")), any(), any(), any(), any(), any()); + + when(serverDescription.getType()).thenReturn(ServerType.STANDALONE); + createBatchCursor().next(operationContext, (result, t) -> { + assertNotNull(result); + assertTrue(result.isEmpty()); + assertNull(t); + }); + + // 2 empty-batch getMores + 1 exhausted getMore = 3 getMores, but the 3rd + // exhausts the cursor (id=0), which makes the cursor to return an empty result. + verify(mockConnection, times(3)).release(); + verify(connectionSource, times(3)).getConnection(any(), any()); + Assertions.assertEquals(3, callCount.get()); + } + private AsyncCursor createBatchCursor() { return new AsyncCommandCursor<>( COMMAND_CURSOR_DOCUMENT, From 7f18d1c71442b75f0e2b78723be9ad477dd25389 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 24 Mar 2026 11:34:10 -0700 Subject: [PATCH 2/6] Clean up AsyncCommandCursor test and fix parameter alignment - Fix parameter alignment in getMoreLoop and getMoreCommand to align with opening parenthesis - Replace com.mongodb.assertions with JUnit 5 Assertions for consistent test assertion usage - Remove unused coreCursor field and its mock from AsyncCommandCursorTest JAVA-6142 --- .../operation/AsyncCommandCursor.java | 12 ++++--- .../operation/AsyncCommandCursorTest.java | 31 +++++++++---------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java index 3037d10b74b..8112db15373 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java @@ -113,8 +113,9 @@ public void next(final OperationContext operationContext, final SingleResultCall }, operationContext, callback); } - private void getMoreLoop(final ServerCursor localServerCursor, final OperationContext operationContext, - final SingleResultCallback> funcCallback) { + private void getMoreLoop(final ServerCursor localServerCursor, + final OperationContext operationContext, + final SingleResultCallback> funcCallback) { getMore(localServerCursor, operationContext, (nextBatch, t) -> { if (t != null) { funcCallback.onResult(null, t); @@ -183,9 +184,10 @@ private void getMore(final ServerCursor cursor, final OperationContext operation getMoreCommand(assertNotNull(connection), cursor, operationContext, wrappedCallback), callback); } - private void getMoreCommand(final AsyncConnection connection, final ServerCursor serverCursor, - final OperationContext operationContext, - final SingleResultCallback> callback) { + private void getMoreCommand(final AsyncConnection connection, + final ServerCursor serverCursor, + final OperationContext operationContext, + final SingleResultCallback> callback) { connection.commandAsync(namespace.getDatabaseName(), getMoreCommandDocument(serverCursor.getId(), connection.getDescription(), namespace, batchSize, comment), NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(), diff --git a/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java b/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java index b145fd585a3..644c35917f8 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java @@ -48,11 +48,12 @@ import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; -import static com.mongodb.assertions.Assertions.assertNotNull; -import static com.mongodb.assertions.Assertions.assertNull; -import static com.mongodb.assertions.Assertions.assertTrue; import static com.mongodb.internal.operation.OperationUnitSpecification.getMaxWireVersionForServerVersion; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; @@ -84,11 +85,9 @@ class AsyncCommandCursorTest { private OperationContext operationContext; private TimeoutContext timeoutContext; private ServerDescription serverDescription; - private AsyncCursor coreCursor; @BeforeEach void setUp() { - coreCursor = mock(AsyncCursor.class); timeoutContext = spy(new TimeoutContext(TimeoutSettings.create( MongoClientSettings.builder().timeout(TIMEOUT.toMillis(), MILLISECONDS).build()))); operationContext = spy(new OperationContext( @@ -130,9 +129,9 @@ void shouldSkipKillsCursorsCommandWhenNetworkErrorOccurs() { //when commandBatchCursor.next(operationContext, (result, t) -> { - Assertions.assertNull(result); - Assertions.assertNotNull(t); - Assertions.assertEquals(MongoSocketException.class, t.getClass()); + assertNull(result); + assertNotNull(t); + assertEquals(MongoSocketException.class, t.getClass()); }); //then @@ -155,9 +154,9 @@ void shouldNotSkipKillsCursorsCommandWhenTimeoutExceptionDoesNotHaveNetworkError //when commandBatchCursor.next(operationContext, (result, t) -> { - Assertions.assertNull(result); - Assertions.assertNotNull(t); - Assertions.assertEquals(MongoOperationTimeoutException.class, t.getClass()); + assertNull(result); + assertNotNull(t); + assertEquals(MongoOperationTimeoutException.class, t.getClass()); }); commandBatchCursor.close(operationContext); @@ -186,9 +185,9 @@ void shouldSkipKillsCursorsCommandWhenTimeoutExceptionHaveNetworkErrorCause() { //when commandBatchCursor.next(operationContext, (result, t) -> { - Assertions.assertNull(result); - Assertions.assertNotNull(t); - Assertions.assertEquals(MongoOperationTimeoutException.class, t.getClass()); + assertNull(result); + assertNotNull(t); + assertEquals(MongoOperationTimeoutException.class, t.getClass()); }); commandBatchCursor.close(operationContext); @@ -224,10 +223,10 @@ void shouldReleaseConnectionBetweenEmptyGetMoreResponses() { }); // 2 empty-batch getMores + 1 exhausted getMore = 3 getMores, but the 3rd - // exhausts the cursor (id=0), which makes the cursor to return an empty result. + // exhausts the cursor (id=0), which makes the cursor to break the loop and return an empty result. verify(mockConnection, times(3)).release(); verify(connectionSource, times(3)).getConnection(any(), any()); - Assertions.assertEquals(3, callCount.get()); + assertEquals(3, callCount.get()); } private AsyncCursor createBatchCursor() { From c8262f0ae86e107c238a701f3312de2f7bd80114 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 24 Mar 2026 11:40:53 -0700 Subject: [PATCH 3/6] Remove unused import. JAVA-6142 --- .../com/mongodb/internal/operation/AsyncCommandCursorTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java b/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java index 644c35917f8..eb0a736a9b0 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java @@ -41,7 +41,6 @@ import org.bson.Document; import org.bson.codecs.Decoder; import org.bson.codecs.DocumentCodec; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; From a02012c305dd6d3fb23e99071c99e98dc760b51c Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 24 Mar 2026 11:43:48 -0700 Subject: [PATCH 4/6] Rename method. JAVA-6142 --- .../mongodb/internal/operation/AsyncCommandCursor.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java index 8112db15373..35a1429a9d0 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java @@ -181,13 +181,13 @@ public int getMaxWireVersion() { private void getMore(final ServerCursor cursor, final OperationContext operationContext, final SingleResultCallback> callback) { resourceManager.executeWithConnection(operationContext, (connection, wrappedCallback) -> - getMoreCommand(assertNotNull(connection), cursor, operationContext, wrappedCallback), callback); + executeGetMoreCommand(assertNotNull(connection), cursor, operationContext, wrappedCallback), callback); } - private void getMoreCommand(final AsyncConnection connection, - final ServerCursor serverCursor, - final OperationContext operationContext, - final SingleResultCallback> callback) { + private void executeGetMoreCommand(final AsyncConnection connection, + final ServerCursor serverCursor, + final OperationContext operationContext, + final SingleResultCallback> callback) { connection.commandAsync(namespace.getDatabaseName(), getMoreCommandDocument(serverCursor.getId(), connection.getDescription(), namespace, batchSize, comment), NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(), From 99741231ffa8de823bddc16737442da41d9a2945 Mon Sep 17 00:00:00 2001 From: Viacheslav Babanin Date: Tue, 24 Mar 2026 15:37:07 -0700 Subject: [PATCH 5/6] Update driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../com/mongodb/internal/operation/AsyncCommandCursorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java b/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java index eb0a736a9b0..6d2ef649bce 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandCursorTest.java @@ -222,7 +222,7 @@ void shouldReleaseConnectionBetweenEmptyGetMoreResponses() { }); // 2 empty-batch getMores + 1 exhausted getMore = 3 getMores, but the 3rd - // exhausts the cursor (id=0), which makes the cursor to break the loop and return an empty result. + // exhausts the cursor (id=0), which makes the cursor break the loop and return an empty result. verify(mockConnection, times(3)).release(); verify(connectionSource, times(3)).getConnection(any(), any()); assertEquals(3, callCount.get()); From bcea0ff32bfd7c1c81b4d69ec962f83b23da21ef Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Wed, 25 Mar 2026 16:02:19 -0700 Subject: [PATCH 6/6] Fix Spock test for connection-per-getMore behavior - Update AsyncCommandBatchCursorSpecification to use separate connection instances per getMore iteration to match the new connection release/reacquisition behavior - Move getMoreLoop below public accessor methods for better readability JAVA-6142 --- .../operation/AsyncCommandCursor.java | 33 ++++++++++--------- ...syncCommandBatchCursorSpecification.groovy | 19 +++++++---- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java index 35a1429a9d0..6af8b9ec678 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java @@ -113,22 +113,6 @@ public void next(final OperationContext operationContext, final SingleResultCall }, operationContext, callback); } - private void getMoreLoop(final ServerCursor localServerCursor, - final OperationContext operationContext, - final SingleResultCallback> funcCallback) { - getMore(localServerCursor, operationContext, (nextBatch, t) -> { - if (t != null) { - funcCallback.onResult(null, t); - } else if (resourceManager.getServerCursor() == null || (nextBatch != null && !nextBatch.isEmpty())) { - commandCursorResult = withEmptyResults(commandCursorResult); - funcCallback.onResult(nextBatch, null); - } else if (!resourceManager.operable()) { - funcCallback.onResult(emptyList(), null); - } else { - getMoreLoop(assertNotNull(resourceManager.getServerCursor()), operationContext, funcCallback); - } - }); - } @Override public boolean isClosed() { @@ -179,6 +163,23 @@ public int getMaxWireVersion() { return maxWireVersion; } + private void getMoreLoop(final ServerCursor localServerCursor, + final OperationContext operationContext, + final SingleResultCallback> funcCallback) { + getMore(localServerCursor, operationContext, (nextBatch, t) -> { + if (t != null) { + funcCallback.onResult(null, t); + } else if (resourceManager.getServerCursor() == null || (nextBatch != null && !nextBatch.isEmpty())) { + commandCursorResult = withEmptyResults(commandCursorResult); + funcCallback.onResult(nextBatch, null); + } else if (!resourceManager.operable()) { + funcCallback.onResult(emptyList(), null); + } else { + getMoreLoop(assertNotNull(resourceManager.getServerCursor()), operationContext, funcCallback); + } + }); + } + private void getMore(final ServerCursor cursor, final OperationContext operationContext, final SingleResultCallback> callback) { resourceManager.executeWithConnection(operationContext, (connection, wrappedCallback) -> executeGetMoreCommand(assertNotNull(connection), cursor, operationContext, wrappedCallback), callback); diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncCommandBatchCursorSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncCommandBatchCursorSpecification.groovy index f0b73f24fe1..b9884e3481c 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncCommandBatchCursorSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncCommandBatchCursorSpecification.groovy @@ -167,8 +167,9 @@ class AsyncCommandBatchCursorSpecification extends Specification { def 'should handle getMore when there are empty results but there is a cursor'() { given: def initialConnection = referenceCountedAsyncConnection() - def connection = referenceCountedAsyncConnection() - def connectionSource = getAsyncConnectionSource(connection) + def connectionA = referenceCountedAsyncConnection() + def connectionB = referenceCountedAsyncConnection() + def connectionSource = getAsyncConnectionSource(connectionA, connectionB) when: def firstBatch = createCommandResult([], CURSOR_ID) @@ -177,14 +178,15 @@ class AsyncCommandBatchCursorSpecification extends Specification { def batch = nextBatch(cursor) then: - 1 * connection.commandAsync(*_) >> { - connection.getCount() == 1 + 1 * connectionA.commandAsync(*_) >> { + connectionA.getCount() == 1 connectionSource.getCount() == 1 it.last().onResult(response, null) } - 1 * connection.commandAsync(*_) >> { - connection.getCount() == 1 + then: + 1 * connectionB.commandAsync(*_) >> { + connectionB.getCount() == 1 connectionSource.getCount() == 1 it.last().onResult(response2, null) } @@ -196,7 +198,10 @@ class AsyncCommandBatchCursorSpecification extends Specification { cursor.close() then: - 0 * connection._ + 0 * connectionA._ + 0 * connectionB._ + connectionA.getCount() == 0 + connectionB.getCount() == 0 initialConnection.getCount() == 0 connectionSource.getCount() == 0