callback) {
beginAsync().thenRun(c -> {
assertTrue(connection.opened());
- authenticationLoopAsync(connection, connection.getDescription(), operationContextWithoutSession(operationContext), c);
+ authenticationLoopAsync(connection, connection.getDescription(), operationContext.withConnectionEstablishmentSessionContext(), c);
}).finish(callback);
}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java
index 7e0de92da1d..d33dd5007f7 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java
@@ -15,7 +15,9 @@
*/
package com.mongodb.internal.connection;
+import com.mongodb.Function;
import com.mongodb.MongoConnectionPoolClearedException;
+import com.mongodb.ReadConcern;
import com.mongodb.RequestContext;
import com.mongodb.ServerAddress;
import com.mongodb.ServerApi;
@@ -33,6 +35,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.stream.Collectors.toList;
@@ -160,6 +163,29 @@ public ServerDeprioritization getServerDeprioritization() {
return serverDeprioritization;
}
+ public OperationContext withNewlyStartedTimeout() {
+ return withTimeoutContext(timeoutContext.withNewlyStartedTimeout());
+ }
+
+ /**
+ * Create a new OperationContext with a SessionContext that does not send a session ID.
+ *
+ * The driver MUST NOT append a session ID to any command sent during the process of
+ * opening and authenticating a connection.
+ */
+ public OperationContext withConnectionEstablishmentSessionContext() {
+ ReadConcern readConcern = getSessionContext().getReadConcern();
+ return withSessionContext(new ReadConcernAwareNoOpSessionContext(readConcern));
+ }
+
+ public OperationContext withMinRoundTripTime(final ServerDescription serverDescription) {
+ return withTimeoutContext(timeoutContext.withMinRoundTripTime(TimeUnit.NANOSECONDS.toMillis(serverDescription.getMinRoundTripTimeNanos())));
+ }
+
+ public OperationContext withOverride(final TimeoutContextOverride timeoutContextOverrideFunction) {
+ return withTimeoutContext(timeoutContextOverrideFunction.apply(timeoutContext));
+ }
+
public static final class ServerDeprioritization {
@Nullable
private ServerAddress candidate;
@@ -220,5 +246,7 @@ private boolean isEnabled(final ClusterType clusterType) {
}
}
}
+
+ public interface TimeoutContextOverride extends Function {}
}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/SaslAuthenticator.java b/driver-core/src/main/com/mongodb/internal/connection/SaslAuthenticator.java
index eeee3a31abd..050a4862b17 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/SaslAuthenticator.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/SaslAuthenticator.java
@@ -27,6 +27,7 @@
import com.mongodb.SubjectProvider;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
@@ -63,12 +64,18 @@ abstract class SaslAuthenticator extends Authenticator implements SpeculativeAut
}
public void authenticate(final InternalConnection connection, final ConnectionDescription connectionDescription,
- final OperationContext operationContext) {
+ final OperationContext originalOperationContext) {
doAsSubject(() -> {
+ OperationContext operationContext = originalOperationContext;
SaslClient saslClient = createSaslClient(connection.getDescription().getServerAddress(), operationContext);
throwIfSaslClientIsNull(saslClient);
try {
- BsonDocument responseDocument = getNextSaslResponse(saslClient, connection, operationContext);
+ BsonDocument responseDocument = connection.opened() ? null : getSpeculativeAuthenticateResponse();
+ if (responseDocument == null) {
+ responseDocument = getNextSaslResponse(saslClient, connection, operationContext);
+ operationContext = operationContext.withOverride(TimeoutContext::withNewlyStartedMaintenanceTimeout);
+ }
+
BsonInt32 conversationId = responseDocument.getInt32("conversationId");
while (!(responseDocument.getBoolean("done")).getValue()) {
@@ -81,7 +88,7 @@ public void authenticate(final InternalConnection connection, final ConnectionDe
}
responseDocument = sendSaslContinue(conversationId, response, connection, operationContext);
- operationContext.getTimeoutContext().resetMaintenanceTimeout();
+ operationContext = operationContext.withOverride(TimeoutContext::withNewlyStartedMaintenanceTimeout);
}
if (!saslClient.isComplete()) {
saslClient.evaluateChallenge((responseDocument.getBinary("payload")).getData());
@@ -117,6 +124,9 @@ void authenticateAsync(final InternalConnection connection, final ConnectionDesc
public abstract String getMechanismName();
+ /**
+ * Does not send any commands to the server
+ */
protected abstract SaslClient createSaslClient(ServerAddress serverAddress, OperationContext operationContext);
protected void appendSaslStartOptions(final BsonDocument saslStartCommand) {
@@ -131,11 +141,6 @@ private void throwIfSaslClientIsNull(@Nullable final SaslClient saslClient) {
private BsonDocument getNextSaslResponse(final SaslClient saslClient, final InternalConnection connection,
final OperationContext operationContext) {
- BsonDocument response = connection.opened() ? null : getSpeculativeAuthenticateResponse();
- if (response != null) {
- return response;
- }
-
try {
byte[] serverResponse = saslClient.hasInitialResponse() ? saslClient.evaluateChallenge(new byte[0]) : null;
return sendSaslStart(serverResponse, connection, operationContext);
@@ -160,7 +165,9 @@ private void getNextSaslResponseAsync(final SaslClient saslClient, final Interna
if (result.getBoolean("done").getValue()) {
verifySaslClientComplete(saslClient, result, errHandlingCallback);
} else {
- new Continuator(saslClient, result, connection, operationContext, errHandlingCallback).start();
+ OperationContext saslContinueOperationContext =
+ operationContext.withOverride(TimeoutContext::withNewlyStartedMaintenanceTimeout);
+ new Continuator(saslClient, result, connection, saslContinueOperationContext, errHandlingCallback).start();
}
});
} else if (response.getBoolean("done").getValue()) {
@@ -232,22 +239,14 @@ private BsonDocument sendSaslStart(@Nullable final byte[] outToken, final Intern
final OperationContext operationContext) {
BsonDocument startDocument = createSaslStartCommandDocument(outToken);
appendSaslStartOptions(startDocument);
- try {
return executeCommand(getMongoCredential().getSource(), startDocument, getClusterConnectionMode(), getServerApi(), connection,
operationContext);
- } finally {
- operationContext.getTimeoutContext().resetMaintenanceTimeout();
- }
}
private BsonDocument sendSaslContinue(final BsonInt32 conversationId, final byte[] outToken, final InternalConnection connection,
final OperationContext operationContext) {
- try {
return executeCommand(getMongoCredential().getSource(), createSaslContinueDocument(conversationId, outToken),
getClusterConnectionMode(), getServerApi(), connection, operationContext);
- } finally {
- operationContext.getTimeoutContext().resetMaintenanceTimeout();
- }
}
private void sendSaslStartAsync(@Nullable final byte[] outToken, final InternalConnection connection,
@@ -256,19 +255,13 @@ private void sendSaslStartAsync(@Nullable final byte[] outToken, final InternalC
appendSaslStartOptions(startDocument);
executeCommandAsync(getMongoCredential().getSource(), startDocument, getClusterConnectionMode(), getServerApi(), connection,
- operationContext, (r, t) -> {
- operationContext.getTimeoutContext().resetMaintenanceTimeout();
- callback.onResult(r, t);
- });
+ operationContext, callback::onResult);
}
private void sendSaslContinueAsync(final BsonInt32 conversationId, final byte[] outToken, final InternalConnection connection,
final OperationContext operationContext, final SingleResultCallback callback) {
executeCommandAsync(getMongoCredential().getSource(), createSaslContinueDocument(conversationId, outToken),
- getClusterConnectionMode(), getServerApi(), connection, operationContext, (r, t) -> {
- operationContext.getTimeoutContext().resetMaintenanceTimeout();
- callback.onResult(r, t);
- });
+ getClusterConnectionMode(), getServerApi(), connection, operationContext, callback::onResult);
}
protected BsonDocument createSaslStartCommandDocument(@Nullable final byte[] outToken) {
@@ -361,7 +354,8 @@ private void continueConversation(final BsonDocument result) {
try {
sendSaslContinueAsync(saslStartDocument.getInt32("conversationId"),
saslClient.evaluateChallenge((result.getBinary("payload")).getData()), connection,
- operationContext, Continuator.this);
+ operationContext.withOverride(TimeoutContext::withNewlyStartedMaintenanceTimeout),
+ Continuator.this);
} catch (SaslException e) {
throw wrapException(e);
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java
index bc7e6655bc7..1a03ce877f7 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java
@@ -51,9 +51,10 @@ public String getCommandName() {
@Override
CommandCreator getCommandCreator() {
return (operationContext, serverDescription, connectionDescription) -> {
- operationContext.getTimeoutContext().resetToDefaultMaxTime();
BsonDocument command = AbortTransactionOperation.super.getCommandCreator()
- .create(operationContext, serverDescription, connectionDescription);
+ .create(operationContext.withOverride(TimeoutContext::withDefaultMaxTime),
+ serverDescription,
+ connectionDescription);
putIfNotNull(command, "recoveryToken", recoveryToken);
return command;
};
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AbstractWriteSearchIndexOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AbstractWriteSearchIndexOperation.java
index 6ebcfda6dbe..18c24b2ff28 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AbstractWriteSearchIndexOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AbstractWriteSearchIndexOperation.java
@@ -22,6 +22,7 @@
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncWriteBinding;
import com.mongodb.internal.binding.WriteBinding;
+import com.mongodb.internal.connection.OperationContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
@@ -45,12 +46,12 @@ abstract class AbstractWriteSearchIndexOperation implements WriteOperation
}
@Override
- public Void execute(final WriteBinding binding) {
- return withConnection(binding, connection -> {
+ public Void execute(final WriteBinding binding, final OperationContext operationContext) {
+ return withConnection(binding, operationContext, (connection, operationContextWithMinRtt) -> {
try {
- executeCommand(binding, namespace.getDatabaseName(), buildCommand(),
+ executeCommand(binding, operationContextWithMinRtt, namespace.getDatabaseName(), buildCommand(),
connection,
- writeConcernErrorTransformer(binding.getOperationContext().getTimeoutContext()));
+ writeConcernErrorTransformer(operationContextWithMinRtt.getTimeoutContext()));
} catch (MongoCommandException mongoCommandException) {
swallowOrThrow(mongoCommandException);
}
@@ -59,20 +60,19 @@ public Void execute(final WriteBinding binding) {
}
@Override
- public void executeAsync(final AsyncWriteBinding binding, final SingleResultCallback callback) {
- withAsyncSourceAndConnection(binding::getWriteConnectionSource, false, callback,
- (connectionSource, connection, cb) ->
- executeCommandAsync(binding, namespace.getDatabaseName(), buildCommand(), connection,
- writeConcernErrorTransformerAsync(binding.getOperationContext().getTimeoutContext()), (result, commandExecutionError) -> {
+ public void executeAsync(final AsyncWriteBinding binding, final OperationContext operationContext, final SingleResultCallback callback) {
+ withAsyncSourceAndConnection(binding::getWriteConnectionSource, false, operationContext, callback,
+ (connectionSource, connection, operationContextWithMinRtt, cb) ->
+ executeCommandAsync(binding, operationContextWithMinRtt, namespace.getDatabaseName(), buildCommand(), connection,
+ writeConcernErrorTransformerAsync(operationContextWithMinRtt.getTimeoutContext()), (result, commandExecutionError) -> {
try {
swallowOrThrow(commandExecutionError);
- callback.onResult(result, null);
+ cb.onResult(result, null);
} catch (Throwable mongoCommandException) {
- callback.onResult(null, mongoCommandException);
+ cb.onResult(null, mongoCommandException);
}
}
- )
- );
+ ));
}
/**
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java
index 1c9abfc68ca..8bbf52fe9ce 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java
@@ -25,6 +25,7 @@
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.internal.client.model.AggregationLevel;
+import com.mongodb.internal.connection.OperationContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
import org.bson.BsonValue;
@@ -141,13 +142,13 @@ public String getCommandName() {
}
@Override
- public BatchCursor execute(final ReadBinding binding) {
- return wrapped.execute(binding);
+ public BatchCursor execute(final ReadBinding binding, final OperationContext operationContext) {
+ return wrapped.execute(binding, operationContext);
}
@Override
- public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) {
- wrapped.executeAsync(binding, callback);
+ public void executeAsync(final AsyncReadBinding binding, final OperationContext operationContext, final SingleResultCallback> callback) {
+ wrapped.executeAsync(binding, operationContext, callback);
}
@Override
@@ -156,7 +157,7 @@ public ReadOperationSimple asExplainableOperation(@Nullable final Explain
}
CommandReadOperation createExplainableOperation(@Nullable final ExplainVerbosity verbosity, final Decoder resultDecoder) {
- return new CommandReadOperation<>(getNamespace().getDatabaseName(), wrapped.getCommandName(),
+ return new ExplainCommandOperation<>(getNamespace().getDatabaseName(), getCommandName(),
(operationContext, serverDescription, connectionDescription) -> {
BsonDocument command = wrapped.getCommand(operationContext, UNKNOWN_WIRE_VERSION);
applyMaxTimeMS(operationContext.getTimeoutContext(), command);
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java
index 4c9bc3828b7..2471fdd76e8 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java
@@ -47,7 +47,7 @@
import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync;
import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
-import static com.mongodb.internal.operation.OperationHelper.setNonTailableCursorMaxTimeSupplier;
+import static com.mongodb.internal.operation.OperationHelper.applyTimeoutModeToOperationContext;
import static com.mongodb.internal.operation.OperationReadConcernHelper.appendReadConcernToCommand;
import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer;
import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead;
@@ -192,16 +192,16 @@ public String getCommandName() {
}
@Override
- public BatchCursor execute(final ReadBinding binding) {
- return executeRetryableRead(binding, namespace.getDatabaseName(),
+ public BatchCursor execute(final ReadBinding binding, final OperationContext operationContext) {
+ return executeRetryableRead(binding, applyTimeoutModeToOperationContext(timeoutMode, operationContext), namespace.getDatabaseName(),
getCommandCreator(), CommandResultDocumentCodec.create(decoder, FIELD_NAMES_WITH_RESULT),
transformer(), retryReads);
}
@Override
- public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) {
+ public void executeAsync(final AsyncReadBinding binding, final OperationContext operationContext, final SingleResultCallback> callback) {
SingleResultCallback> errHandlingCallback = errorHandlingCallback(callback, LOGGER);
- executeRetryableReadAsync(binding, namespace.getDatabaseName(),
+ executeRetryableReadAsync(binding, applyTimeoutModeToOperationContext(timeoutMode, operationContext), namespace.getDatabaseName(),
getCommandCreator(), CommandResultDocumentCodec.create(decoder, FIELD_NAMES_WITH_RESULT),
asyncTransformer(), retryReads,
errHandlingCallback);
@@ -216,7 +216,6 @@ BsonDocument getCommand(final OperationContext operationContext, final int maxWi
BsonDocument commandDocument = new BsonDocument(getCommandName(), aggregateTarget.create());
appendReadConcernToCommand(operationContext.getSessionContext(), maxWireVersion, commandDocument);
commandDocument.put("pipeline", pipelineCreator.create());
- setNonTailableCursorMaxTimeSupplier(timeoutMode, operationContext);
BsonDocument cursor = new BsonDocument();
if (batchSize != null) {
cursor.put("batchSize", new BsonInt32(batchSize));
@@ -242,15 +241,19 @@ BsonDocument getCommand(final OperationContext operationContext, final int maxWi
}
private CommandReadTransformer> transformer() {
- return (result, source, connection) ->
- new CommandBatchCursor<>(getTimeoutMode(), result, batchSize != null ? batchSize : 0,
- getMaxTimeForCursor(source.getOperationContext().getTimeoutContext()), decoder, comment, source, connection);
+ return (result, source, connection, operationContext) ->
+ new CommandBatchCursor<>(getTimeoutMode(), getMaxTimeForCursor(operationContext.getTimeoutContext()), operationContext, new CommandCursor<>(
+ result, batchSize != null ? batchSize : 0,
+ decoder, comment, source, connection
+ ));
}
private CommandReadTransformerAsync> asyncTransformer() {
- return (result, source, connection) ->
- new AsyncCommandBatchCursor<>(getTimeoutMode(), result, batchSize != null ? batchSize : 0,
- getMaxTimeForCursor(source.getOperationContext().getTimeoutContext()), decoder, comment, source, connection);
+ return (result, source, connection, operationContext) ->
+ new AsyncCommandBatchCursor<>(getTimeoutMode(), getMaxTimeForCursor(operationContext.getTimeoutContext()),
+ operationContext, new AsyncCommandCursor<>(
+ result, batchSize != null ? batchSize : 0, decoder, comment, source, connection
+ ));
}
private TimeoutMode getTimeoutMode() {
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java
index 16f33ad45e5..3bf90295dfb 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java
@@ -26,6 +26,7 @@
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.internal.client.model.AggregationLevel;
+import com.mongodb.internal.connection.OperationContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonArray;
import org.bson.BsonBoolean;
@@ -39,8 +40,10 @@
import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;
+import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync;
import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync;
import static com.mongodb.internal.operation.ServerVersionHelper.FIVE_DOT_ZERO_WIRE_VERSION;
+import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer;
import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead;
import static com.mongodb.internal.operation.WriteConcernHelper.appendWriteConcernToCommand;
import static com.mongodb.internal.operation.WriteConcernHelper.throwOnWriteConcernError;
@@ -158,30 +161,35 @@ public String getCommandName() {
}
@Override
- public Void execute(final ReadBinding binding) {
- return executeRetryableRead(binding,
- () -> binding.getReadConnectionSource(FIVE_DOT_ZERO_WIRE_VERSION, ReadPreference.primary()),
- namespace.getDatabaseName(),
- getCommandCreator(),
- new BsonDocumentCodec(), (result, source, connection) -> {
- throwOnWriteConcernError(result, connection.getDescription().getServerAddress(),
- connection.getDescription().getMaxWireVersion(), binding.getOperationContext().getTimeoutContext());
- return null;
- }, false);
+ public Void execute(final ReadBinding binding, final OperationContext operationContext) {
+ return executeRetryableRead(
+ operationContext,
+ (serverSelectionOperationContext) ->
+ binding.getReadConnectionSource(
+ FIVE_DOT_ZERO_WIRE_VERSION,
+ ReadPreference.primary(),
+ serverSelectionOperationContext),
+ namespace.getDatabaseName(),
+ getCommandCreator(),
+ new BsonDocumentCodec(),
+ transformer(),
+ false);
}
@Override
- public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback callback) {
- executeRetryableReadAsync(binding,
- (connectionSourceCallback) ->
- binding.getReadConnectionSource(FIVE_DOT_ZERO_WIRE_VERSION, ReadPreference.primary(), connectionSourceCallback),
- namespace.getDatabaseName(),
- getCommandCreator(),
- new BsonDocumentCodec(), (result, source, connection) -> {
- throwOnWriteConcernError(result, connection.getDescription().getServerAddress(),
- connection.getDescription().getMaxWireVersion(), binding.getOperationContext().getTimeoutContext());
- return null;
- }, false, callback);
+ public void executeAsync(final AsyncReadBinding binding, final OperationContext operationContext,
+ final SingleResultCallback callback) {
+ executeRetryableReadAsync(
+ binding,
+ operationContext,
+ (serverSelectionOperationContext, connectionSourceCallback) ->
+ binding.getReadConnectionSource(FIVE_DOT_ZERO_WIRE_VERSION, ReadPreference.primary(), serverSelectionOperationContext, connectionSourceCallback),
+ namespace.getDatabaseName(),
+ getCommandCreator(),
+ new BsonDocumentCodec(),
+ asyncTransformer(),
+ false,
+ callback);
}
private CommandOperationHelper.CommandCreator getCommandCreator() {
@@ -220,4 +228,20 @@ private CommandOperationHelper.CommandCreator getCommandCreator() {
return commandDocument;
};
}
+
+ private static CommandReadTransformer transformer() {
+ return (result, source, connection, operationContext) -> {
+ throwOnWriteConcernError(result, connection.getDescription().getServerAddress(),
+ connection.getDescription().getMaxWireVersion(), operationContext.getTimeoutContext());
+ return null;
+ };
+ }
+
+ private static CommandReadTransformerAsync asyncTransformer() {
+ return (result, source, connection, operationContext) -> {
+ throwOnWriteConcernError(result, connection.getDescription().getServerAddress(),
+ connection.getDescription().getMaxWireVersion(), operationContext.getTimeoutContext());
+ return null;
+ };
+ }
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java
index a4cfbafedb6..a144888f859 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java
@@ -16,12 +16,13 @@
package com.mongodb.internal.operation;
+
import com.mongodb.MongoException;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor;
-import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
+import com.mongodb.internal.connection.OperationContext;
import com.mongodb.lang.NonNull;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
@@ -43,7 +44,10 @@
final class AsyncChangeStreamBatchCursor implements AsyncAggregateResponseBatchCursor {
private final AsyncReadBinding binding;
- private final TimeoutContext timeoutContext;
+ /**
+ * The initial operation context, which is used as an initial state to create new operation contexts for each operation.
+ */
+ private final OperationContext initialOperationContext;
private final ChangeStreamOperation changeStreamOperation;
private final int maxWireVersion;
@@ -53,40 +57,40 @@ final class AsyncChangeStreamBatchCursor implements AsyncAggregateResponseBat
* {@code wrapped} containing {@code null} and {@link #isClosed} being {@code false}.
* This represents a situation in which the wrapped object was closed by {@code this} but {@code this} remained open.
*/
- private final AtomicReference> wrapped;
+ private final AtomicReference> wrapped;
private final AtomicBoolean isClosed;
AsyncChangeStreamBatchCursor(final ChangeStreamOperation changeStreamOperation,
- final AsyncCommandBatchCursor wrapped,
+ final AsyncCursor wrapped,
final AsyncReadBinding binding,
+ final OperationContext operationContext,
@Nullable final BsonDocument resumeToken,
final int maxWireVersion) {
this.changeStreamOperation = changeStreamOperation;
this.wrapped = new AtomicReference<>(assertNotNull(wrapped));
this.binding = binding;
binding.retain();
- this.timeoutContext = binding.getOperationContext().getTimeoutContext();
+ this.initialOperationContext = operationContext.withOverride(TimeoutContext::withMaxTimeAsMaxAwaitTimeOverride);
this.resumeToken = resumeToken;
this.maxWireVersion = maxWireVersion;
isClosed = new AtomicBoolean();
}
@NonNull
- AsyncCommandBatchCursor getWrapped() {
+ AsyncCursor getWrapped() {
return assertNotNull(wrapped.get());
}
@Override
public void next(final SingleResultCallback> callback) {
- resumeableOperation(AsyncBatchCursor::next, callback, false);
+ resumeableOperation(AsyncCursor::next, callback, initialOperationContext.withNewlyStartedTimeout(), false);
}
@Override
public void close() {
- timeoutContext.resetTimeoutIfPresent();
if (isClosed.compareAndSet(false, true)) {
try {
- nullifyAndCloseWrapped();
+ nullifyAndCloseWrapped(initialOperationContext.withNewlyStartedTimeout());
} finally {
binding.release();
}
@@ -116,7 +120,7 @@ public boolean isClosed() {
}
private boolean wrappedClosedItself() {
- AsyncAggregateResponseBatchCursor observedWrapped = wrapped.get();
+ AsyncCursor observedWrapped = wrapped.get();
return observedWrapped != null && observedWrapped.isClosed();
}
@@ -125,10 +129,10 @@ private boolean wrappedClosedItself() {
* if {@link #wrappedClosedItself()} observes a {@linkplain AsyncAggregateResponseBatchCursor#isClosed() closed} wrapped object,
* then it closed itself as opposed to being closed by {@code this}.
*/
- private void nullifyAndCloseWrapped() {
- AsyncAggregateResponseBatchCursor observedWrapped = wrapped.getAndSet(null);
+ private void nullifyAndCloseWrapped(final OperationContext operationContext) {
+ AsyncCursor observedWrapped = wrapped.getAndSet(null);
if (observedWrapped != null) {
- observedWrapped.close();
+ observedWrapped.close(operationContext);
}
}
@@ -137,14 +141,14 @@ private void nullifyAndCloseWrapped() {
* {@code setWrappedOrCloseIt(AsyncCommandBatchCursor)} is called concurrently with or after (in the happens-before order)
* the method {@link #close()}.
*/
- private void setWrappedOrCloseIt(final AsyncCommandBatchCursor newValue) {
+ private void setWrappedOrCloseIt(final AsyncCursor newValue, final OperationContext operationContext) {
if (isClosed()) {
assertNull(wrapped.get());
- newValue.close();
+ newValue.close(operationContext);
} else {
assertNull(wrapped.getAndSet(newValue));
if (isClosed()) {
- nullifyAndCloseWrapped();
+ nullifyAndCloseWrapped(operationContext);
}
}
}
@@ -169,7 +173,7 @@ public int getMaxWireVersion() {
return maxWireVersion;
}
- private void cachePostBatchResumeToken(final AsyncCommandBatchCursor cursor) {
+ private void cachePostBatchResumeToken(final AsyncCursor cursor) {
BsonDocument resumeToken = cursor.getPostBatchResumeToken();
if (resumeToken != null) {
this.resumeToken = resumeToken;
@@ -177,19 +181,22 @@ private void cachePostBatchResumeToken(final AsyncCommandBatchCursor cursor, SingleResultCallback> callback);
+ void apply(AsyncCursor cursor, OperationContext operationContext,
+ SingleResultCallback> callback);
}
- private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResultCallback> callback, final boolean tryNext) {
- timeoutContext.resetTimeoutIfPresent();
+ private void resumeableOperation(final AsyncBlock asyncBlock,
+ final SingleResultCallback> callback,
+ final OperationContext operationContext,
+ final boolean tryNext) {
SingleResultCallback> errHandlingCallback = errorHandlingCallback(callback, LOGGER);
if (isClosed()) {
errHandlingCallback.onResult(null, new MongoException(format("%s called after the cursor was closed.",
tryNext ? "tryNext()" : "next()")));
return;
}
- AsyncCommandBatchCursor wrappedCursor = getWrapped();
- asyncBlock.apply(wrappedCursor, (result, t) -> {
+ AsyncCursor wrappedCursor = getWrapped();
+ asyncBlock.apply(wrappedCursor, operationContext, (result, t) -> {
if (t == null) {
try {
List convertedResults;
@@ -206,8 +213,8 @@ private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResult
} else {
cachePostBatchResumeToken(wrappedCursor);
if (isResumableError(t, maxWireVersion)) {
- nullifyAndCloseWrapped();
- retryOperation(asyncBlock, errHandlingCallback, tryNext);
+ nullifyAndCloseWrapped(operationContext);
+ retryOperation(asyncBlock, errHandlingCallback, operationContext, tryNext);
} else {
errHandlingCallback.onResult(null, t);
}
@@ -215,26 +222,29 @@ private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResult
});
}
- private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallback> callback,
+ private void retryOperation(final AsyncBlock asyncBlock,
+ final SingleResultCallback> callback,
+ final OperationContext operationContext,
final boolean tryNext) {
- withAsyncReadConnectionSource(binding, (source, t) -> {
+ withAsyncReadConnectionSource(binding, operationContext, (source, t) -> {
if (t != null) {
callback.onResult(null, t);
} else {
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken,
assertNotNull(source).getServerDescription().getMaxWireVersion());
source.release();
- changeStreamOperation.executeAsync(binding, (asyncBatchCursor, t1) -> {
+ changeStreamOperation.executeAsync(binding, operationContext, (asyncBatchCursor, t1) -> {
if (t1 != null) {
callback.onResult(null, t1);
} else {
try {
- setWrappedOrCloseIt(assertNotNull((AsyncChangeStreamBatchCursor) asyncBatchCursor).getWrapped());
+ setWrappedOrCloseIt(assertNotNull((AsyncChangeStreamBatchCursor) asyncBatchCursor).getWrapped(),
+ operationContext);
} finally {
try {
binding.release(); // release the new change stream batch cursor's reference to the binding
} finally {
- resumeableOperation(asyncBlock, callback, tryNext);
+ resumeableOperation(asyncBlock, callback, operationContext, tryNext);
}
}
}
@@ -243,3 +253,4 @@ private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallb
});
}
}
+
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java
index 792c10b4bb2..f390981694e 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java
@@ -16,357 +16,94 @@
package com.mongodb.internal.operation;
-import com.mongodb.MongoCommandException;
-import com.mongodb.MongoException;
-import com.mongodb.MongoNamespace;
-import com.mongodb.MongoOperationTimeoutException;
-import com.mongodb.MongoSocketException;
-import com.mongodb.ReadPreference;
-import com.mongodb.ServerAddress;
-import com.mongodb.ServerCursor;
-import com.mongodb.annotations.ThreadSafe;
import com.mongodb.client.cursor.TimeoutMode;
-import com.mongodb.connection.ConnectionDescription;
-import com.mongodb.connection.ServerType;
-import com.mongodb.internal.TimeoutContext;
-import com.mongodb.internal.VisibleForTesting;
import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
-import com.mongodb.internal.async.function.AsyncCallbackSupplier;
-import com.mongodb.internal.binding.AsyncConnectionSource;
-import com.mongodb.internal.connection.AsyncConnection;
-import com.mongodb.internal.connection.Connection;
import com.mongodb.internal.connection.OperationContext;
-import com.mongodb.internal.operation.AsyncOperationHelper.AsyncCallableConnectionWithCallback;
-import com.mongodb.internal.validator.NoOpFieldNameValidator;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
-import org.bson.BsonValue;
-import org.bson.codecs.BsonDocumentCodec;
-import org.bson.codecs.Decoder;
import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import static com.mongodb.assertions.Assertions.assertNotNull;
-import static com.mongodb.assertions.Assertions.assertTrue;
-import static com.mongodb.assertions.Assertions.doesNotThrow;
-import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
-import static com.mongodb.internal.operation.CommandBatchCursorHelper.FIRST_BATCH;
-import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR;
-import static com.mongodb.internal.operation.CommandBatchCursorHelper.NEXT_BATCH;
-import static com.mongodb.internal.operation.CommandBatchCursorHelper.getKillCursorsCommand;
-import static com.mongodb.internal.operation.CommandBatchCursorHelper.getMoreCommandDocument;
-import static com.mongodb.internal.operation.CommandBatchCursorHelper.logCommandCursorResult;
-import static com.mongodb.internal.operation.CommandBatchCursorHelper.translateCommandException;
-import static com.mongodb.internal.operation.CommandCursorResult.withEmptyResults;
-import static java.util.Collections.emptyList;
+public class AsyncCommandBatchCursor implements AsyncAggregateResponseBatchCursor {
-class AsyncCommandBatchCursor implements AsyncAggregateResponseBatchCursor {
-
- private final MongoNamespace namespace;
- private final Decoder decoder;
- @Nullable
- private final BsonValue comment;
- private final int maxWireVersion;
- private final boolean firstBatchEmpty;
- private final ResourceManager resourceManager;
- private final OperationContext operationContext;
private final TimeoutMode timeoutMode;
- private final AtomicBoolean processedInitial = new AtomicBoolean();
- private int batchSize;
- private volatile CommandCursorResult commandCursorResult;
- private boolean resetTimeoutWhenClosing;
+ private OperationContext operationContext;
+
+ private AsyncCursor wrapped;
AsyncCommandBatchCursor(
final TimeoutMode timeoutMode,
- final BsonDocument commandCursorDocument,
- final int batchSize, final long maxTimeMS,
- final Decoder decoder,
- @Nullable final BsonValue comment,
- final AsyncConnectionSource connectionSource,
- final AsyncConnection connection) {
- ConnectionDescription connectionDescription = connection.getDescription();
- this.commandCursorResult = toCommandCursorResult(connectionDescription.getServerAddress(), FIRST_BATCH, commandCursorDocument);
- this.namespace = commandCursorResult.getNamespace();
- this.batchSize = batchSize;
- this.decoder = decoder;
- this.comment = comment;
- this.maxWireVersion = connectionDescription.getMaxWireVersion();
- this.firstBatchEmpty = commandCursorResult.getResults().isEmpty();
- operationContext = connectionSource.getOperationContext();
+ final long maxTimeMs,
+ final OperationContext operationContext,
+ final AsyncCursor wrapped) {
+ this.operationContext = operationContext.withOverride(timeoutContext ->
+ timeoutContext.withMaxTimeOverride(maxTimeMs));
this.timeoutMode = timeoutMode;
-
- operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS);
-
- AsyncConnection connectionToPin = connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER
- ? connection : null;
- resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor());
- resetTimeoutWhenClosing = true;
+ this.wrapped = wrapped;
}
@Override
public void next(final SingleResultCallback> callback) {
- resourceManager.execute(funcCallback -> {
- checkTimeoutModeAndResetTimeoutContextIfIteration();
- ServerCursor localServerCursor = resourceManager.getServerCursor();
- boolean serverCursorIsNull = localServerCursor == null;
- List batchResults = emptyList();
- if (!processedInitial.getAndSet(true) && !firstBatchEmpty) {
- batchResults = commandCursorResult.getResults();
- }
-
- if (serverCursorIsNull || !batchResults.isEmpty()) {
- commandCursorResult = withEmptyResults(commandCursorResult);
- funcCallback.onResult(batchResults, null);
- } else {
- getMore(localServerCursor, funcCallback);
- }
- }, callback);
+ resetTimeout();
+ wrapped.next(operationContext, callback);
}
@Override
- public boolean isClosed() {
- return !resourceManager.operable();
+ public void setBatchSize(final int batchSize) {
+ wrapped.setBatchSize(batchSize);
}
@Override
- public void setBatchSize(final int batchSize) {
- this.batchSize = batchSize;
+ public int getBatchSize() {
+ return wrapped.getBatchSize();
}
@Override
- public int getBatchSize() {
- return batchSize;
+ public boolean isClosed() {
+ return wrapped.isClosed();
}
@Override
public void close() {
- resourceManager.close();
+ wrapped.close(operationContext
+ .withOverride(timeoutContext -> timeoutContext
+ .withNewlyStartedTimeout()
+ .withDefaultMaxTime()
+ ));
}
@Nullable
- @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
- ServerCursor getServerCursor() {
- if (!resourceManager.operable()) {
- return null;
- }
- return resourceManager.getServerCursor();
- }
-
@Override
public BsonDocument getPostBatchResumeToken() {
- return commandCursorResult.getPostBatchResumeToken();
+ return wrapped.getPostBatchResumeToken();
}
+ @Nullable
@Override
public BsonTimestamp getOperationTime() {
- return commandCursorResult.getOperationTime();
+ return wrapped.getOperationTime();
}
@Override
public boolean isFirstBatchEmpty() {
- return firstBatchEmpty;
+ return wrapped.isFirstBatchEmpty();
}
@Override
public int getMaxWireVersion() {
- return maxWireVersion;
+ return wrapped.getMaxWireVersion();
}
- void checkTimeoutModeAndResetTimeoutContextIfIteration() {
+ private void resetTimeout() {
if (timeoutMode == TimeoutMode.ITERATION) {
- operationContext.getTimeoutContext().resetTimeoutIfPresent();
+ operationContext = operationContext.withNewlyStartedTimeout();
}
}
- private void getMore(final ServerCursor cursor, final SingleResultCallback> callback) {
- resourceManager.executeWithConnection((connection, wrappedCallback) ->
- getMoreLoop(assertNotNull(connection), cursor, wrappedCallback), callback);
- }
-
- private void getMoreLoop(final AsyncConnection connection, final ServerCursor serverCursor,
- final SingleResultCallback> callback) {
- connection.commandAsync(namespace.getDatabaseName(),
- getMoreCommandDocument(serverCursor.getId(), connection.getDescription(), namespace, batchSize, comment),
- NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(),
- CommandResultDocumentCodec.create(decoder, NEXT_BATCH),
- assertNotNull(resourceManager.getConnectionSource()).getOperationContext(),
- (commandResult, t) -> {
- if (t != null) {
- Throwable translatedException =
- t instanceof MongoCommandException
- ? translateCommandException((MongoCommandException) t, serverCursor)
- : t;
- callback.onResult(null, translatedException);
- return;
- }
- commandCursorResult = toCommandCursorResult(
- connection.getDescription().getServerAddress(), NEXT_BATCH, assertNotNull(commandResult));
- ServerCursor nextServerCursor = commandCursorResult.getServerCursor();
- resourceManager.setServerCursor(nextServerCursor);
- List nextBatch = commandCursorResult.getResults();
- if (nextServerCursor == null || !nextBatch.isEmpty()) {
- commandCursorResult = withEmptyResults(commandCursorResult);
- callback.onResult(nextBatch, null);
- return;
- }
-
- if (!resourceManager.operable()) {
- callback.onResult(emptyList(), null);
- return;
- }
-
- getMoreLoop(connection, nextServerCursor, callback);
- });
- }
-
- private CommandCursorResult toCommandCursorResult(final ServerAddress serverAddress, final String fieldNameContainingBatch,
- final BsonDocument commandCursorDocument) {
- CommandCursorResult commandCursorResult = new CommandCursorResult<>(serverAddress, fieldNameContainingBatch,
- commandCursorDocument);
- logCommandCursorResult(commandCursorResult);
- return commandCursorResult;
- }
-
- /**
- * Configures the cursor to {@link #close()}
- * without {@linkplain TimeoutContext#resetTimeoutIfPresent() resetting} its {@linkplain TimeoutContext#getTimeout() timeout}.
- * This is useful when managing the {@link #close()} behavior externally.
- */
- AsyncCommandBatchCursor disableTimeoutResetWhenClosing() {
- resetTimeoutWhenClosing = false;
- return this;
- }
-
- @ThreadSafe
- private final class ResourceManager extends CursorResourceManager {
- ResourceManager(
- final MongoNamespace namespace,
- final AsyncConnectionSource connectionSource,
- @Nullable final AsyncConnection connectionToPin,
- @Nullable final ServerCursor serverCursor) {
- super(namespace, connectionSource, connectionToPin, serverCursor);
- }
-
- /**
- * Thread-safe.
- * Executes {@code operation} within the {@link #tryStartOperation()}/{@link #endOperation()} bounds.
- */
- void execute(final AsyncCallbackSupplier operation, final SingleResultCallback callback) {
- boolean canStartOperation = doesNotThrow(this::tryStartOperation);
- if (!canStartOperation) {
- callback.onResult(null, new IllegalStateException(MESSAGE_IF_CLOSED_AS_CURSOR));
- } else {
- operation.whenComplete(() -> {
- endOperation();
- if (super.getServerCursor() == null) {
- // At this point all resources have been released,
- // but `isClose` may still be returning `false` if `close` have not been called.
- // Self-close to update the state managed by `ResourceManger`, and so that `isClosed` return `true`.
- close();
- }
- }).get(callback);
- }
- }
-
- @Override
- void markAsPinned(final AsyncConnection connectionToPin, final Connection.PinningMode pinningMode) {
- connectionToPin.markAsPinned(pinningMode);
- }
-
- @Override
- void doClose() {
- TimeoutContext timeoutContext = operationContext.getTimeoutContext();
- timeoutContext.resetToDefaultMaxTime();
- SingleResultCallback thenDoNothing = (r, t) -> {};
- if (resetTimeoutWhenClosing) {
- timeoutContext.doWithResetTimeout(this::releaseResourcesAsync, thenDoNothing);
- } else {
- releaseResourcesAsync(thenDoNothing);
- }
- }
-
- private void releaseResourcesAsync(final SingleResultCallback callback) {
- beginAsync().thenRunTryCatchAsyncBlocks(c -> {
- if (isSkipReleasingServerResourcesOnClose()) {
- unsetServerCursor();
- }
- if (super.getServerCursor() != null) {
- beginAsync().thenSupply(c2 -> {
- getConnection(c2);
- }).thenConsume((connection, c3) -> {
- beginAsync().thenRun(c4 -> {
- releaseServerResourcesAsync(connection, c4);
- }).thenAlwaysRunAndFinish(() -> {
- connection.release();
- }, c3);
- }).finish(c);
- } else {
- c.complete(c);
- }
- }, MongoException.class, (e, c5) -> {
- c5.complete(c5); // ignore exceptions when releasing server resources
- }).thenAlwaysRunAndFinish(() -> {
- // guarantee that regardless of exceptions, `serverCursor` is null and client resources are released
- unsetServerCursor();
- releaseClientResources();
- }, callback);
- }
-
- void executeWithConnection(final AsyncCallableConnectionWithCallback callable, final SingleResultCallback callback) {
- getConnection((connection, t) -> {
- if (t != null) {
- callback.onResult(null, t);
- return;
- }
- callable.call(assertNotNull(connection), (result, t1) -> {
- if (t1 != null) {
- handleException(connection, t1);
- }
- connection.release();
- callback.onResult(result, t1);
- });
- });
- }
-
- private void handleException(final AsyncConnection connection, final Throwable exception) {
- if (exception instanceof MongoOperationTimeoutException && exception.getCause() instanceof MongoSocketException) {
- onCorruptedConnection(connection, (MongoSocketException) exception.getCause());
- } else if (exception instanceof MongoSocketException) {
- onCorruptedConnection(connection, (MongoSocketException) exception);
- }
- }
-
- private void getConnection(final SingleResultCallback callback) {
- assertTrue(getState() != State.IDLE);
- AsyncConnection pinnedConnection = getPinnedConnection();
- if (pinnedConnection != null) {
- callback.onResult(assertNotNull(pinnedConnection).retain(), null);
- } else {
- assertNotNull(getConnectionSource()).getConnection(callback);
- }
- }
-
- private void releaseServerResourcesAsync(final AsyncConnection connection, final SingleResultCallback callback) {
- beginAsync().thenRun((c) -> {
- ServerCursor localServerCursor = super.getServerCursor();
- if (localServerCursor != null) {
- killServerCursorAsync(getNamespace(), localServerCursor, connection, callback);
- } else {
- c.complete(c);
- }
- }).thenAlwaysRunAndFinish(() -> {
- unsetServerCursor();
- }, callback);
- }
-
- private void killServerCursorAsync(final MongoNamespace namespace, final ServerCursor localServerCursor,
- final AsyncConnection localConnection, final SingleResultCallback callback) {
- localConnection.commandAsync(namespace.getDatabaseName(), getKillCursorsCommand(namespace, localServerCursor),
- NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(), new BsonDocumentCodec(),
- operationContext, (r, t) -> callback.onResult(null, null));
- }
+ AsyncCursor getWrapped() {
+ return wrapped;
}
}
+
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java
new file mode 100644
index 00000000000..91286bd520b
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandCursor.java
@@ -0,0 +1,350 @@
+package com.mongodb.internal.operation;
+
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import com.mongodb.MongoCommandException;
+import com.mongodb.MongoException;
+import com.mongodb.MongoNamespace;
+import com.mongodb.MongoOperationTimeoutException;
+import com.mongodb.MongoSocketException;
+import com.mongodb.ReadPreference;
+import com.mongodb.ServerAddress;
+import com.mongodb.ServerCursor;
+import com.mongodb.annotations.ThreadSafe;
+import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.connection.ServerType;
+import com.mongodb.internal.async.SingleResultCallback;
+import com.mongodb.internal.async.function.AsyncCallbackSupplier;
+import com.mongodb.internal.binding.AsyncConnectionSource;
+import com.mongodb.internal.connection.AsyncConnection;
+import com.mongodb.internal.connection.Connection;
+import com.mongodb.internal.connection.OperationContext;
+import com.mongodb.internal.operation.AsyncOperationHelper.AsyncCallableConnectionWithCallback;
+import com.mongodb.internal.validator.NoOpFieldNameValidator;
+import com.mongodb.lang.Nullable;
+import org.bson.BsonDocument;
+import org.bson.BsonTimestamp;
+import org.bson.BsonValue;
+import org.bson.codecs.BsonDocumentCodec;
+import org.bson.codecs.Decoder;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.mongodb.assertions.Assertions.assertNotNull;
+import static com.mongodb.assertions.Assertions.assertTrue;
+import static com.mongodb.assertions.Assertions.doesNotThrow;
+import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
+import static com.mongodb.internal.async.SingleResultCallback.THEN_DO_NOTHING;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.FIRST_BATCH;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.NEXT_BATCH;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.getKillCursorsCommand;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.getMoreCommandDocument;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.logCommandCursorResult;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.translateCommandException;
+import static com.mongodb.internal.operation.CommandCursorResult.withEmptyResults;
+import static java.util.Collections.emptyList;
+
+class AsyncCommandCursor implements AsyncCursor {
+
+ private final MongoNamespace namespace;
+ private final Decoder decoder;
+ @Nullable
+ private final BsonValue comment;
+ private final int maxWireVersion;
+ private final boolean firstBatchEmpty;
+ private final ResourceManager resourceManager;
+ private final AtomicBoolean processedInitial = new AtomicBoolean();
+ private int batchSize;
+ private volatile CommandCursorResult commandCursorResult;
+
+ AsyncCommandCursor(
+ final BsonDocument commandCursorDocument,
+ final int batchSize,
+ final Decoder decoder,
+ @Nullable final BsonValue comment,
+ final AsyncConnectionSource connectionSource,
+ final AsyncConnection connection) {
+ ConnectionDescription connectionDescription = connection.getDescription();
+ this.commandCursorResult = toCommandCursorResult(connectionDescription.getServerAddress(), FIRST_BATCH, commandCursorDocument);
+ this.namespace = commandCursorResult.getNamespace();
+ this.batchSize = batchSize;
+ this.decoder = decoder;
+ this.comment = comment;
+ this.maxWireVersion = connectionDescription.getMaxWireVersion();
+ this.firstBatchEmpty = commandCursorResult.getResults().isEmpty();
+ AsyncConnection connectionToPin = connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER
+ ? connection : null;
+ resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor());
+ }
+
+ @Override
+ public void next(final OperationContext operationContext, final SingleResultCallback> callback) {
+ resourceManager.execute(funcCallback -> {
+ ServerCursor localServerCursor = resourceManager.getServerCursor();
+ boolean serverCursorIsNull = localServerCursor == null;
+ List batchResults = emptyList();
+ if (!processedInitial.getAndSet(true) && !firstBatchEmpty) {
+ batchResults = commandCursorResult.getResults();
+ }
+
+ if (serverCursorIsNull || !batchResults.isEmpty()) {
+ commandCursorResult = withEmptyResults(commandCursorResult);
+ funcCallback.onResult(batchResults, null);
+ } else {
+ getMore(localServerCursor, operationContext, funcCallback);
+ }
+ }, operationContext, callback);
+ }
+
+ @Override
+ public boolean isClosed() {
+ return !resourceManager.operable();
+ }
+
+ @Override
+ public void setBatchSize(final int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ @Override
+ public void close(final OperationContext operationContext) {
+ resourceManager.close(operationContext);
+ }
+
+ @Nullable
+ @Override
+ public ServerCursor getServerCursor() {
+ if (!resourceManager.operable()) {
+ return null;
+ }
+ return resourceManager.getServerCursor();
+ }
+
+ @Override
+ public BsonDocument getPostBatchResumeToken() {
+ return commandCursorResult.getPostBatchResumeToken();
+ }
+
+ @Override
+ public BsonTimestamp getOperationTime() {
+ return commandCursorResult.getOperationTime();
+ }
+
+ @Override
+ public boolean isFirstBatchEmpty() {
+ return firstBatchEmpty;
+ }
+
+ @Override
+ public int getMaxWireVersion() {
+ return maxWireVersion;
+ }
+
+ private void getMore(final ServerCursor cursor, final OperationContext operationContext, final SingleResultCallback> callback) {
+ resourceManager.executeWithConnection(operationContext, (connection, wrappedCallback) ->
+ getMoreLoop(assertNotNull(connection), cursor, operationContext, wrappedCallback), callback);
+ }
+
+ private void getMoreLoop(final AsyncConnection connection, final ServerCursor serverCursor,
+ final OperationContext operationContext,
+ final SingleResultCallback> callback) {
+ connection.commandAsync(namespace.getDatabaseName(),
+ getMoreCommandDocument(serverCursor.getId(), connection.getDescription(), namespace, batchSize, comment),
+ NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(),
+ CommandResultDocumentCodec.create(decoder, NEXT_BATCH),
+ operationContext,
+ (commandResult, t) -> {
+ if (t != null) {
+ Throwable translatedException =
+ t instanceof MongoCommandException
+ ? translateCommandException((MongoCommandException) t, serverCursor)
+ : t;
+ callback.onResult(null, translatedException);
+ return;
+ }
+ commandCursorResult = toCommandCursorResult(
+ connection.getDescription().getServerAddress(), NEXT_BATCH, assertNotNull(commandResult));
+ ServerCursor nextServerCursor = commandCursorResult.getServerCursor();
+ resourceManager.setServerCursor(nextServerCursor);
+ List nextBatch = commandCursorResult.getResults();
+ if (nextServerCursor == null || !nextBatch.isEmpty()) {
+ commandCursorResult = withEmptyResults(commandCursorResult);
+ callback.onResult(nextBatch, null);
+ return;
+ }
+
+ if (!resourceManager.operable()) {
+ callback.onResult(emptyList(), null);
+ return;
+ }
+
+ getMoreLoop(connection, nextServerCursor, operationContext, callback);
+ });
+ }
+
+ private CommandCursorResult toCommandCursorResult(final ServerAddress serverAddress, final String fieldNameContainingBatch,
+ final BsonDocument commandCursorDocument) {
+ CommandCursorResult commandCursorResult = new CommandCursorResult<>(serverAddress, fieldNameContainingBatch,
+ commandCursorDocument);
+ logCommandCursorResult(commandCursorResult);
+ return commandCursorResult;
+ }
+
+ @ThreadSafe
+ private final class ResourceManager extends CursorResourceManager {
+ ResourceManager(
+ final MongoNamespace namespace,
+ final AsyncConnectionSource connectionSource,
+ @Nullable final AsyncConnection connectionToPin,
+ @Nullable final ServerCursor serverCursor) {
+ super(namespace, connectionSource, connectionToPin, serverCursor);
+ }
+
+ /**
+ * Thread-safe.
+ */
+ void execute(final AsyncCallbackSupplier operation, final OperationContext operationContext, final SingleResultCallback callback) {
+ boolean canStartOperation = doesNotThrow(this::tryStartOperation);
+ if (!canStartOperation) {
+ callback.onResult(null, new IllegalStateException(MESSAGE_IF_CLOSED_AS_CURSOR));
+ } else {
+ operation.whenComplete(() -> {
+ endOperation(operationContext);
+ if (super.getServerCursor() == null) {
+ // At this point all resources have been released,
+ // but `isClose` may still be returning `false` if `close` have not been called.
+ // Self-close to update the state managed by `ResourceManger`, and so that `isClosed` return `true`.
+ close(operationContext);
+ }
+ }).get(callback);
+ }
+ }
+
+ @Override
+ void markAsPinned(final AsyncConnection connectionToPin, final Connection.PinningMode pinningMode) {
+ connectionToPin.markAsPinned(pinningMode);
+ }
+
+ @Override
+ void doClose(final OperationContext operationContext) {
+ releaseResourcesAsync(operationContext, THEN_DO_NOTHING);
+ }
+
+ private void releaseResourcesAsync(final OperationContext operationContext, final SingleResultCallback callback) {
+ beginAsync().thenRunTryCatchAsyncBlocks(c -> {
+ if (isSkipReleasingServerResourcesOnClose()) {
+ unsetServerCursor();
+ }
+ if (super.getServerCursor() != null) {
+ beginAsync().thenSupply(c2 -> {
+ getConnection(operationContext, c2);
+ }).thenConsume((connection, c3) -> {
+ beginAsync().thenRun(c4 -> {
+ releaseServerResourcesAsync(connection, operationContext, c4);
+ }).thenAlwaysRunAndFinish(() -> {
+ connection.release();
+ }, c3);
+ }).finish(c);
+ } else {
+ c.complete(c);
+ }
+ }, MongoException.class, (e, c5) -> {
+ c5.complete(c5); // ignore exceptions when releasing server resources
+ }).thenAlwaysRunAndFinish(() -> {
+ // guarantee that regardless of exceptions, `serverCursor` is null and client resources are released
+ unsetServerCursor();
+ releaseClientResources();
+ }, callback);
+ }
+
+ void executeWithConnection(final OperationContext operationContext, final AsyncCallableConnectionWithCallback callable,
+ final SingleResultCallback callback) {
+ getConnection(operationContext, (connection, t) -> {
+ if (t != null) {
+ callback.onResult(null, t);
+ return;
+ }
+ callable.call(assertNotNull(connection), (result, t1) -> {
+ if (t1 != null) {
+ handleException(connection, t1);
+ }
+ connection.release();
+ callback.onResult(result, t1);
+ });
+ });
+ }
+
+ private void handleException(final AsyncConnection connection, final Throwable exception) {
+ if (exception instanceof MongoOperationTimeoutException && exception.getCause() instanceof MongoSocketException) {
+ onCorruptedConnection(connection, (MongoSocketException) exception.getCause());
+ } else if (exception instanceof MongoSocketException) {
+ onCorruptedConnection(connection, (MongoSocketException) exception);
+ }
+ }
+
+ private void getConnection(final OperationContext operationContext, final SingleResultCallback callback) {
+ assertTrue(getState() != State.IDLE);
+ AsyncConnection pinnedConnection = getPinnedConnection();
+ if (pinnedConnection != null) {
+ callback.onResult(assertNotNull(pinnedConnection).retain(), null);
+ } else {
+ assertNotNull(getConnectionSource()).getConnection(operationContext, callback);
+ }
+ }
+
+ private void releaseServerResourcesAsync(final AsyncConnection connection, final OperationContext operationContext,
+ final SingleResultCallback callback) {
+ beginAsync().thenRun((c) -> {
+ ServerCursor localServerCursor = super.getServerCursor();
+ if (localServerCursor != null) {
+ killServerCursorAsync(getNamespace(), localServerCursor, connection, operationContext, callback);
+ } else {
+ c.complete(c);
+ }
+ }).thenAlwaysRunAndFinish(() -> {
+ unsetServerCursor();
+ }, callback);
+ }
+
+ private void killServerCursorAsync(
+ final MongoNamespace namespace,
+ final ServerCursor localServerCursor,
+ final AsyncConnection localConnection,
+ final OperationContext operationContext,
+ final SingleResultCallback callback) {
+ beginAsync().thenRun(c -> {
+ localConnection.commandAsync(
+ namespace.getDatabaseName(),
+ getKillCursorsCommand(namespace, localServerCursor),
+ NoOpFieldNameValidator.INSTANCE,
+ ReadPreference.primary(),
+ new BsonDocumentCodec(),
+ operationContext,
+ (r, t) -> c.complete(c));
+ }).finish(callback);
+ }
+ }
+}
+
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncCursor.java
new file mode 100644
index 00000000000..3028057c7ee
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncCursor.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.internal.operation;
+
+import com.mongodb.ServerCursor;
+import com.mongodb.internal.async.AsyncBatchCursor;
+import com.mongodb.internal.async.SingleResultCallback;
+import com.mongodb.internal.connection.OperationContext;
+import com.mongodb.lang.Nullable;
+import org.bson.BsonDocument;
+import org.bson.BsonTimestamp;
+
+import java.util.List;
+
+public interface AsyncCursor {
+ void close(OperationContext operationContext);
+ void next(OperationContext operationContext, SingleResultCallback> callback);
+
+ /**
+ * Sets the batch size to use when requesting the next batch. This is the number of documents to request in the next batch.
+ *
+ * @param batchSize the non-negative batch size. 0 means to use the server default.
+ */
+ void setBatchSize(int batchSize);
+
+ /**
+ * Gets the batch size to use when requesting the next batch. This is the number of documents to request in the next batch.
+ *
+ * @return the non-negative batch size. 0 means to use the server default.
+ */
+ int getBatchSize();
+
+ @Nullable
+ ServerCursor getServerCursor();
+
+
+ @Nullable
+ BsonDocument getPostBatchResumeToken();
+
+ @Nullable
+ BsonTimestamp getOperationTime();
+
+ boolean isFirstBatchEmpty();
+
+ int getMaxWireVersion();
+
+
+ /**
+ * Implementations of {@link AsyncBatchCursor} are allowed to close themselves, see {@link #close()} for more details.
+ *
+ * @return {@code true} if {@code this} has been closed or has closed itself.
+ */
+ boolean isClosed();
+}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java
index f158b3944ae..3063826fbe8 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java
@@ -21,12 +21,13 @@
import com.mongodb.ReadPreference;
import com.mongodb.assertions.Assertions;
import com.mongodb.client.cursor.TimeoutMode;
+import com.mongodb.connection.ServerDescription;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
-import com.mongodb.internal.async.function.AsyncCallbackBiFunction;
import com.mongodb.internal.async.function.AsyncCallbackFunction;
import com.mongodb.internal.async.function.AsyncCallbackSupplier;
+import com.mongodb.internal.async.function.AsyncCallbackTriFunction;
import com.mongodb.internal.async.function.RetryState;
import com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier;
import com.mongodb.internal.binding.AsyncConnectionSource;
@@ -62,7 +63,7 @@
final class AsyncOperationHelper {
interface AsyncCallableWithConnection {
- void call(@Nullable AsyncConnection connection, @Nullable Throwable t);
+ void call(@Nullable AsyncConnection connection, OperationContext operationContext, @Nullable Throwable t);
}
interface AsyncCallableConnectionWithCallback {
@@ -74,59 +75,75 @@ interface AsyncCallableWithSource {
}
interface CommandWriteTransformerAsync {
-
- /**
- * Yield an appropriate result object for the input object.
- *
- * @param t the input object
- * @return the function result
- */
@Nullable
R apply(T t, AsyncConnection connection);
}
interface CommandReadTransformerAsync {
-
- /**
- * Yield an appropriate result object for the input object.
- *
- * @param t the input object
- * @return the function result
- */
@Nullable
- R apply(T t, AsyncConnectionSource source, AsyncConnection connection);
+ R apply(T t, AsyncConnectionSource source, AsyncConnection connection, OperationContext operationContext);
}
- static void withAsyncReadConnectionSource(final AsyncReadBinding binding, final AsyncCallableWithSource callable) {
- binding.getReadConnectionSource(errorHandlingCallback(new AsyncCallableWithSourceCallback(callable), OperationHelper.LOGGER));
+ static void withAsyncReadConnectionSource(final AsyncReadBinding binding, final OperationContext operationContext,
+ final AsyncCallableWithSource callable) {
+ binding.getReadConnectionSource(operationContext,
+ errorHandlingCallback(new AsyncCallableWithSourceCallback(callable), OperationHelper.LOGGER));
}
- static void withAsyncConnection(final AsyncWriteBinding binding, final AsyncCallableWithConnection callable) {
- binding.getWriteConnectionSource(errorHandlingCallback(new AsyncCallableWithConnectionCallback(callable), OperationHelper.LOGGER));
+ static void withAsyncConnection(final AsyncWriteBinding binding,
+ final OperationContext originalOperationContext,
+ final AsyncCallableWithConnection callable) {
+ OperationContext serverSelectionOperationContext = originalOperationContext.withOverride(TimeoutContext::withComputedServerSelectionTimeout);
+ binding.getWriteConnectionSource(
+ serverSelectionOperationContext,
+ errorHandlingCallback(
+ new AsyncCallableWithConnectionCallback(callable, serverSelectionOperationContext, originalOperationContext),
+ OperationHelper.LOGGER));
}
/**
- * @see #withAsyncSuppliedResource(AsyncCallbackSupplier, boolean, SingleResultCallback, AsyncCallbackFunction)
+ * @see #withAsyncSuppliedResource(AsyncCallbackFunction, boolean, OperationContext, SingleResultCallback, AsyncCallbackFunction)
*/
- static void withAsyncSourceAndConnection(final AsyncCallbackSupplier sourceSupplier,
- final boolean wrapConnectionSourceException, final SingleResultCallback callback,
- final AsyncCallbackBiFunction asyncFunction)
+ static void withAsyncSourceAndConnection(
+ final AsyncCallbackFunction sourceAsyncFunction,
+ final boolean wrapConnectionSourceException,
+ final OperationContext operationContext,
+ final SingleResultCallback callback,
+ final AsyncCallbackTriFunction asyncFunction)
throws OperationHelper.ResourceSupplierInternalException {
SingleResultCallback errorHandlingCallback = errorHandlingCallback(callback, OperationHelper.LOGGER);
- withAsyncSuppliedResource(sourceSupplier, wrapConnectionSourceException, errorHandlingCallback,
+
+ OperationContext serverSelectionOperationContext =
+ operationContext.withOverride(TimeoutContext::withComputedServerSelectionTimeout);
+ withAsyncSuppliedResource(
+ sourceAsyncFunction,
+ wrapConnectionSourceException,
+ serverSelectionOperationContext,
+ errorHandlingCallback,
(source, sourceReleasingCallback) ->
- withAsyncSuppliedResource(source::getConnection, wrapConnectionSourceException, sourceReleasingCallback,
+ withAsyncSuppliedResource(
+ source::getConnection,
+ wrapConnectionSourceException,
+ serverSelectionOperationContext.withMinRoundTripTime(source.getServerDescription()),
+ sourceReleasingCallback,
(connection, connectionAndSourceReleasingCallback) ->
- asyncFunction.apply(source, connection, connectionAndSourceReleasingCallback)));
+ asyncFunction.apply(
+ source,
+ connection,
+ operationContext.withMinRoundTripTime(source.getServerDescription()),
+ connectionAndSourceReleasingCallback)));
}
- static void withAsyncSuppliedResource(final AsyncCallbackSupplier resourceSupplier,
- final boolean wrapSourceConnectionException, final SingleResultCallback callback,
- final AsyncCallbackFunction function) throws OperationHelper.ResourceSupplierInternalException {
+ static void withAsyncSuppliedResource(final AsyncCallbackFunction resourceSupplier,
+ final boolean wrapSourceConnectionException,
+ final OperationContext operationContext,
+ final SingleResultCallback callback,
+ final AsyncCallbackFunction function)
+ throws OperationHelper.ResourceSupplierInternalException {
SingleResultCallback errorHandlingCallback = errorHandlingCallback(callback, OperationHelper.LOGGER);
- resourceSupplier.get((resource, supplierException) -> {
+ resourceSupplier.apply(operationContext, (resource, supplierException) -> {
if (supplierException != null) {
if (wrapSourceConnectionException) {
supplierException = new OperationHelper.ResourceSupplierInternalException(supplierException);
@@ -144,57 +161,47 @@ static void withAsyncSuppliedResource(final Asyn
});
}
- static void withAsyncConnectionSourceCallableConnection(final AsyncConnectionSource source,
- final AsyncCallableWithConnection callable) {
- source.getConnection((connection, t) -> {
- source.release();
- if (t != null) {
- callable.call(null, t);
- } else {
- callable.call(connection, null);
- }
- });
- }
-
- static void withAsyncConnectionSource(final AsyncConnectionSource source, final AsyncCallableWithSource callable) {
+ static void withAsyncConnectionSource(final AsyncConnectionSource source,
+ final AsyncCallableWithSource callable) {
callable.call(source, null);
}
static void executeRetryableReadAsync(
final AsyncReadBinding binding,
+ final OperationContext operationContext,
final String database,
final CommandCreator commandCreator,
final Decoder decoder,
final CommandReadTransformerAsync transformer,
final boolean retryReads,
final SingleResultCallback callback) {
- executeRetryableReadAsync(binding, binding::getReadConnectionSource, database, commandCreator,
+ executeRetryableReadAsync(binding, operationContext, binding::getReadConnectionSource, database, commandCreator,
decoder, transformer, retryReads, callback);
}
static void executeRetryableReadAsync(
final AsyncReadBinding binding,
- final AsyncCallbackSupplier sourceAsyncSupplier,
+ final OperationContext operationContext,
+ final AsyncCallbackFunction sourceAsyncFunction,
final String database,
final CommandCreator commandCreator,
final Decoder decoder,
final CommandReadTransformerAsync transformer,
final boolean retryReads,
final SingleResultCallback callback) {
- RetryState retryState = initialRetryState(retryReads, binding.getOperationContext().getTimeoutContext());
+ RetryState retryState = initialRetryState(retryReads, operationContext.getTimeoutContext());
binding.retain();
- OperationContext operationContext = binding.getOperationContext();
- AsyncCallbackSupplier asyncRead = decorateReadWithRetriesAsync(retryState, binding.getOperationContext(),
+ AsyncCallbackSupplier asyncRead = decorateReadWithRetriesAsync(retryState, operationContext,
(AsyncCallbackSupplier) funcCallback ->
- withAsyncSourceAndConnection(sourceAsyncSupplier, false, funcCallback,
- (source, connection, releasingCallback) -> {
+ withAsyncSourceAndConnection(sourceAsyncFunction, false, operationContext, funcCallback,
+ (source, connection, operationContextWithMinRtt, releasingCallback) -> {
if (retryState.breakAndCompleteIfRetryAnd(
() -> !OperationHelper.canRetryRead(source.getServerDescription(),
- operationContext),
+ operationContextWithMinRtt),
releasingCallback)) {
return;
}
- createReadCommandAndExecuteAsync(retryState, operationContext, source, database,
+ createReadCommandAndExecuteAsync(retryState, operationContextWithMinRtt, source, database,
commandCreator, decoder, transformer, connection, releasingCallback);
})
).whenComplete(binding::release);
@@ -203,20 +210,31 @@ static void executeRetryableReadAsync(
static void executeCommandAsync(
final AsyncWriteBinding binding,
+ final OperationContext operationContext,
final String database,
final CommandCreator commandCreator,
final CommandWriteTransformerAsync transformer,
final SingleResultCallback callback) {
Assertions.notNull("binding", binding);
- withAsyncSourceAndConnection(binding::getWriteConnectionSource, false, callback,
- (source, connection, releasingCallback) ->
- executeCommandAsync(binding, database, commandCreator.create(
- binding.getOperationContext(), source.getServerDescription(), connection.getDescription()),
- connection, transformer, releasingCallback)
- );
+ withAsyncSourceAndConnection(
+ binding::getWriteConnectionSource,
+ false,
+ operationContext,
+ callback,
+ (source, connection, operationContextWithMinRtt, releasingCallback) ->
+ executeCommandAsync(
+ binding,
+ operationContextWithMinRtt,
+ database,
+ commandCreator.create(
+ operationContextWithMinRtt, source.getServerDescription(), connection.getDescription()),
+ connection,
+ transformer,
+ releasingCallback));
}
static void executeCommandAsync(final AsyncWriteBinding binding,
+ final OperationContext operationContext,
final String database,
final BsonDocument command,
final AsyncConnection connection,
@@ -226,11 +244,12 @@ static void executeCommandAsync(final AsyncWriteBinding binding,
SingleResultCallback addingRetryableLabelCallback = addingRetryableLabelCallback(callback,
connection.getDescription().getMaxWireVersion());
connection.commandAsync(database, command, NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(), new BsonDocumentCodec(),
- binding.getOperationContext(), transformingWriteCallback(transformer, connection, addingRetryableLabelCallback));
+ operationContext, transformingWriteCallback(transformer, connection, addingRetryableLabelCallback));
}
static void executeRetryableWriteAsync(
final AsyncWriteBinding binding,
+ final OperationContext operationContext,
final String database,
@Nullable final ReadPreference readPreference,
final FieldNameValidator fieldNameValidator,
@@ -240,9 +259,8 @@ static void executeRetryableWriteAsync(
final Function retryCommandModifier,
final SingleResultCallback callback) {
- RetryState retryState = initialRetryState(true, binding.getOperationContext().getTimeoutContext());
+ RetryState retryState = initialRetryState(true, operationContext.getTimeoutContext());
binding.retain();
- OperationContext operationContext = binding.getOperationContext();
AsyncCallbackSupplier asyncWrite = decorateWriteWithRetriesAsync(retryState, operationContext,
(AsyncCallbackSupplier) funcCallback -> {
@@ -250,14 +268,14 @@ static void executeRetryableWriteAsync(
if (!firstAttempt && operationContext.getSessionContext().hasActiveTransaction()) {
operationContext.getSessionContext().clearTransactionContext();
}
- withAsyncSourceAndConnection(binding::getWriteConnectionSource, true, funcCallback,
- (source, connection, releasingCallback) -> {
+ withAsyncSourceAndConnection(binding::getWriteConnectionSource, true, operationContext, funcCallback,
+ (source, connection, operationContextWithMinRtt, releasingCallback) -> {
int maxWireVersion = connection.getDescription().getMaxWireVersion();
SingleResultCallback addingRetryableLabelCallback = firstAttempt
? releasingCallback
: addingRetryableLabelCallback(releasingCallback, maxWireVersion);
if (retryState.breakAndCompleteIfRetryAnd(() ->
- !OperationHelper.canRetryWrite(connection.getDescription(), operationContext.getSessionContext()),
+ !OperationHelper.canRetryWrite(connection.getDescription(), operationContextWithMinRtt.getSessionContext()),
addingRetryableLabelCallback)) {
return;
}
@@ -268,7 +286,7 @@ static void executeRetryableWriteAsync(
Assertions.assertFalse(firstAttempt);
return retryCommandModifier.apply(previousAttemptCommand);
}).orElseGet(() -> commandCreator.create(
- operationContext,
+ operationContextWithMinRtt,
source.getServerDescription(),
connection.getDescription()));
// attach `maxWireVersion`, `retryableCommandFlag` ASAP because they are used to check whether we should retry
@@ -281,7 +299,8 @@ static void executeRetryableWriteAsync(
return;
}
connection.commandAsync(database, command, fieldNameValidator, readPreference, commandResultDecoder,
- operationContext, transformingWriteCallback(transformer, connection, addingRetryableLabelCallback));
+ operationContextWithMinRtt,
+ transformingWriteCallback(transformer, connection, addingRetryableLabelCallback));
});
}).whenComplete(binding::release);
@@ -307,7 +326,7 @@ static void createReadCommandAndExecuteAsync(
return;
}
connection.commandAsync(database, command, NoOpFieldNameValidator.INSTANCE, source.getReadPreference(), decoder,
- operationContext, transformingReadCallback(transformer, source, connection, callback));
+ operationContext, transformingReadCallback(transformer, source, connection, operationContext, callback));
}
static AsyncCallbackSupplier decorateReadWithRetriesAsync(final RetryState retryState, final OperationContext operationContext,
@@ -339,14 +358,21 @@ static CommandWriteTransformerAsync writeConcernErrorTransfo
}
static CommandReadTransformerAsync> asyncSingleBatchCursorTransformer(final String fieldName) {
- return (result, source, connection) ->
+ return (result, source, connection, operationContext) ->
new AsyncSingleBatchCursor<>(BsonDocumentWrapperHelper.toList(result, fieldName), 0);
}
- static AsyncBatchCursor cursorDocumentToAsyncBatchCursor(final TimeoutMode timeoutMode, final BsonDocument cursorDocument,
- final int batchSize, final Decoder decoder, @Nullable final BsonValue comment, final AsyncConnectionSource source,
- final AsyncConnection connection) {
- return new AsyncCommandBatchCursor<>(timeoutMode, cursorDocument, batchSize, 0, decoder, comment, source, connection);
+ static AsyncBatchCursor cursorDocumentToAsyncBatchCursor(final TimeoutMode timeoutMode,
+ final BsonDocument cursorDocument,
+ final int batchSize,
+ final Decoder decoder,
+ @Nullable final BsonValue comment,
+ final AsyncConnectionSource source,
+ final AsyncConnection connection,
+ final OperationContext operationContext) {
+ return new AsyncCommandBatchCursor<>(timeoutMode, 0, operationContext, new AsyncCommandCursor<>(
+ cursorDocument, batchSize, decoder, comment, source, connection
+ ));
}
static SingleResultCallback releasingCallback(final SingleResultCallback wrapped, final AsyncConnection connection) {
@@ -388,19 +414,37 @@ private static SingleResultCallback transformingWriteCallback(final Co
private static class AsyncCallableWithConnectionCallback implements SingleResultCallback {
private final AsyncCallableWithConnection callable;
+ private final OperationContext serverSelectionOperationContext;
+ private final OperationContext originalOperationContext;
- AsyncCallableWithConnectionCallback(final AsyncCallableWithConnection callable) {
+ AsyncCallableWithConnectionCallback(final AsyncCallableWithConnection callable,
+ final OperationContext serverSelectionOperationContext,
+ final OperationContext originalOperationContext) {
this.callable = callable;
+ this.serverSelectionOperationContext = serverSelectionOperationContext;
+ this.originalOperationContext = originalOperationContext;
}
@Override
public void onResult(@Nullable final AsyncConnectionSource source, @Nullable final Throwable t) {
if (t != null) {
- callable.call(null, t);
+ callable.call(null, originalOperationContext, t);
} else {
- withAsyncConnectionSourceCallableConnection(Assertions.assertNotNull(source), callable);
+ withAsyncConnectionSourceCallableConnection(assertNotNull(source));
}
}
+
+ void withAsyncConnectionSourceCallableConnection(final AsyncConnectionSource source) {
+ source.getConnection(serverSelectionOperationContext, (connection, t) -> {
+ source.release();
+ ServerDescription serverDescription = source.getServerDescription();
+ if (t != null) {
+ callable.call(null, originalOperationContext.withMinRoundTripTime(serverDescription), t);
+ } else {
+ callable.call(connection, originalOperationContext.withMinRoundTripTime(serverDescription), null);
+ }
+ });
+ }
}
private static class AsyncCallableWithSourceCallback implements SingleResultCallback {
@@ -456,14 +500,14 @@ private static SingleResultCallback addingRetryableLabelCallback(final Si
}
private static SingleResultCallback transformingReadCallback(final CommandReadTransformerAsync transformer,
- final AsyncConnectionSource source, final AsyncConnection connection, final SingleResultCallback callback) {
+ final AsyncConnectionSource source, final AsyncConnection connection, final OperationContext operationContext, final SingleResultCallback callback) {
return (result, t) -> {
if (t != null) {
callback.onResult(null, t);
} else {
R transformedResult;
try {
- transformedResult = transformer.apply(assertNotNull(result), source, connection);
+ transformedResult = transformer.apply(assertNotNull(result), source, connection, operationContext);
} catch (Throwable e) {
callback.onResult(null, e);
return;
diff --git a/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java b/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java
index c5d56fda81c..0f7a7a5196d 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java
@@ -23,6 +23,7 @@
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncWriteBinding;
import com.mongodb.internal.binding.WriteBinding;
+import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
@@ -76,8 +77,8 @@ public String getCommandName() {
@Override
- public T execute(final WriteBinding binding) {
- return executeRetryableWrite(binding, getDatabaseName(), null, getFieldNameValidator(),
+ public T execute(final WriteBinding binding, final OperationContext operationContext) {
+ return executeRetryableWrite(binding, operationContext, getDatabaseName(), null, getFieldNameValidator(),
CommandResultDocumentCodec.create(getDecoder(), "value"),
getCommandCreator(),
FindAndModifyHelper.transformer(),
@@ -85,8 +86,8 @@ public T execute(final WriteBinding binding) {
}
@Override
- public void executeAsync(final AsyncWriteBinding binding, final SingleResultCallback callback) {
- executeRetryableWriteAsync(binding, getDatabaseName(), null, getFieldNameValidator(),
+ public void executeAsync(final AsyncWriteBinding binding, final OperationContext operationContext, final SingleResultCallback callback) {
+ executeRetryableWriteAsync(binding, operationContext, getDatabaseName(), null, getFieldNameValidator(),
CommandResultDocumentCodec.create(getDecoder(), "value"),
getCommandCreator(),
FindAndModifyHelper.asyncTransformer(), cmd -> cmd, callback);
diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java
index c4bd72a4775..a750637a10e 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java
@@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package com.mongodb.internal.operation;
+
import com.mongodb.MongoChangeStreamException;
import com.mongodb.MongoException;
import com.mongodb.MongoOperationTimeoutException;
@@ -23,6 +23,7 @@
import com.mongodb.ServerCursor;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.binding.ReadBinding;
+import com.mongodb.internal.connection.OperationContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
@@ -32,15 +33,15 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
-import java.util.function.Function;
import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.internal.operation.ChangeStreamBatchCursorHelper.isResumableError;
import static com.mongodb.internal.operation.SyncOperationHelper.withReadConnectionSource;
/**
- * A change stream cursor that wraps {@link CommandBatchCursor} with automatic resumption capabilities in the event
+ * A change stream cursor that wraps {@link Cursor} with automatic resumption capabilities in the event
* of timeouts or transient errors.
*
* Upon encountering a resumable error during {@code hasNext()}, {@code next()}, or {@code tryNext()} calls, the {@link ChangeStreamBatchCursor}
@@ -55,11 +56,14 @@
*
*/
final class ChangeStreamBatchCursor implements AggregateResponseBatchCursor {
- private final ReadBinding binding;
+ private ReadBinding binding;
+ /**
+ * The initial operation context, which is used as an initial state to create new operation contexts for each operation.
+ */
+ private final OperationContext initialOperationContext;
private final ChangeStreamOperation changeStreamOperation;
private final int maxWireVersion;
- private final TimeoutContext timeoutContext;
- private CommandBatchCursor wrapped;
+ private Cursor wrapped;
private BsonDocument resumeToken;
private final AtomicBoolean closed;
@@ -71,13 +75,14 @@ final class ChangeStreamBatchCursor implements AggregateResponseBatchCursor changeStreamOperation,
- final CommandBatchCursor wrapped,
+ final Cursor wrapped,
final ReadBinding binding,
+ final OperationContext operationContext,
@Nullable final BsonDocument resumeToken,
final int maxWireVersion) {
- this.timeoutContext = binding.getOperationContext().getTimeoutContext();
this.changeStreamOperation = changeStreamOperation;
this.binding = binding.retain();
+ this.initialOperationContext = operationContext.withOverride(TimeoutContext::withMaxTimeAsMaxAwaitTimeOverride);
this.wrapped = wrapped;
this.resumeToken = resumeToken;
this.maxWireVersion = maxWireVersion;
@@ -85,29 +90,29 @@ final class ChangeStreamBatchCursor implements AggregateResponseBatchCursor getWrapped() {
+ Cursor getWrapped() {
return wrapped;
}
@Override
public boolean hasNext() {
- return resumeableOperation(commandBatchCursor -> {
+ return resumeableOperation((cursor, operationContext) -> {
try {
- return commandBatchCursor.hasNext();
+ return cursor.hasNext(operationContext);
} finally {
- cachePostBatchResumeToken(commandBatchCursor);
+ cachePostBatchResumeToken(cursor);
}
});
}
@Override
public List next() {
- return resumeableOperation(commandBatchCursor -> {
+ return resumeableOperation((cursor, operationContext) -> {
try {
- return convertAndProduceLastId(commandBatchCursor.next(), changeStreamOperation.getDecoder(),
+ return convertAndProduceLastId(cursor.next(operationContext), changeStreamOperation.getDecoder(),
lastId -> resumeToken = lastId);
} finally {
- cachePostBatchResumeToken(commandBatchCursor);
+ cachePostBatchResumeToken(cursor);
}
});
}
@@ -119,13 +124,13 @@ public int available() {
@Override
public List tryNext() {
- return resumeableOperation(commandBatchCursor -> {
+ return resumeableOperation((cursor, operationContext) -> {
try {
- List tryNext = commandBatchCursor.tryNext();
+ List tryNext = cursor.tryNext(operationContext);
return tryNext == null ? null
: convertAndProduceLastId(tryNext, changeStreamOperation.getDecoder(), lastId -> resumeToken = lastId);
} finally {
- cachePostBatchResumeToken(commandBatchCursor);
+ cachePostBatchResumeToken(cursor);
}
});
}
@@ -133,8 +138,7 @@ public List tryNext() {
@Override
public void close() {
if (!closed.getAndSet(true)) {
- timeoutContext.resetTimeoutIfPresent();
- wrapped.close();
+ wrapped.close(initialOperationContext);
binding.release();
}
}
@@ -184,9 +188,9 @@ public int getMaxWireVersion() {
return maxWireVersion;
}
- private void cachePostBatchResumeToken(final AggregateResponseBatchCursor commandBatchCursor) {
- if (commandBatchCursor.getPostBatchResumeToken() != null) {
- resumeToken = commandBatchCursor.getPostBatchResumeToken();
+ private void cachePostBatchResumeToken(final Cursor cursor) {
+ if (cursor.getPostBatchResumeToken() != null) {
+ resumeToken = cursor.getPostBatchResumeToken();
}
}
@@ -210,10 +214,10 @@ static List convertAndProduceLastId(final List