Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.internal.operation;

import com.mongodb.MongoNamespace;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadWriteBinding;
import com.mongodb.internal.connection.OperationContext;

/**
* An async-only operation that performs a write followed by a read that returns a cursor.
*
* <p>Unlike {@link ReadOperationCursor}, this operation requires an {@link AsyncReadWriteBinding}
* so that both the write and the read portions can be executed without narrowing casts.
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface AsyncWriteThenReadOperationCursor<T> {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a new interface that will be used by OperationExecutorImpl
It adds a new override to execute that accepts this interface instead of ReadOperation
All methods here were copied from ReadOperation interface


/**
* @return the command name of the write phase of this operation (e.g. "mapReduce", "aggregate")
*/
String getCommandName();

/**
* @return the namespace the write phase targets
*/
MongoNamespace getNamespace();

/**
* Executes the write phase followed by the read phase, yielding an {@link AsyncBatchCursor}
* over the results of the read.
*
* @param binding the read-write binding used by both phases
* @param operationContext the operation context to use
* @param callback receives the {@link AsyncBatchCursor} on success, or the failure of either phase
*/
void executeAsync(AsyncReadWriteBinding binding, OperationContext operationContext,
SingleResultCallback<AsyncBatchCursor<T>> callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.mongodb.internal.observability.micrometer.TracingManager;
import com.mongodb.internal.observability.micrometer.TransactionSpan;
import com.mongodb.internal.operation.AbortTransactionOperation;
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.CommitTransactionOperation;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteConcernHelper;
Expand Down Expand Up @@ -88,7 +89,7 @@ public boolean notifyMessageSent() {

@Override
public void notifyOperationInitiated(final Object operation) {
assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation);
assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation || operation instanceof AsyncWriteThenReadOperationCursor);
if (!(hasActiveTransaction() || operation instanceof CommitTransactionOperation)) {
assertTrue(getPinnedServerAddress() == null
|| (transactionState != TransactionState.ABORTED && transactionState != TransactionState.NONE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand Down Expand Up @@ -201,10 +202,46 @@ public ReadOperationCursor<T> asReadOperation(final int initialBatchSize) {
if (inline) {
// initialBatchSize is ignored for map reduce operations.
return createMapReduceInlineOperation();
} else {
return new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(),
createFindOperation(initialBatchSize));
}
throw new IllegalStateException("Non-inline map-reduce uses the write-then-read path; "
+ "asReadOperation must not be called.");
}

@Override
public Mono<BatchCursor<T>> batchCursor(final int initialBatchSize) {
if (inline) {
return super.batchCursor(initialBatchSize);
}
return writeThenReadBatchCursor(initialBatchSize);
}
Comment on lines +211 to +216
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a unit test here


@Override
public Publisher<T> first() {
if (inline) {
return super.first();
}
return writeThenReadBatchCursor(1)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this number 1 comes from BatchCursorPublisher

.flatMap(batchCursor -> {
batchCursor.setBatchSize(1);
return Mono.from(batchCursor.next())
.doOnTerminate(batchCursor::close)
.flatMap(results -> {
if (results == null || results.isEmpty()) {
return Mono.empty();
}
return Mono.fromCallable(() -> results.get(0));
});
});
}

private Mono<BatchCursor<T>> writeThenReadBatchCursor(final int initialBatchSize) {
return getMongoOperationPublisher()
.createWriteThenReadOperationMono(
operations -> operations.createTimeoutSettings(maxTimeMS),
() -> new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(),
createFindOperation(initialBatchSize)),
getClientSession())
.map(BatchCursor::new);
}

private WrappedMapReduceReadOperation<T> createMapReduceInlineOperation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.bulk.WriteRequest;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.IndexHelper;
import com.mongodb.internal.operation.Operations;
import com.mongodb.internal.operation.ReadOperation;
Expand Down Expand Up @@ -513,6 +515,15 @@ <T> Mono<T> createReadOperationMono(final Supplier<TimeoutSettings> timeoutSetti
.execute(readOperation, readPreference, getReadConcern(), clientSession);
}

<R> Mono<AsyncBatchCursor<R>> createWriteThenReadOperationMono(
final Function<Operations<?>, TimeoutSettings> timeoutSettingsFunction,
final Supplier<AsyncWriteThenReadOperationCursor<R>> operationSupplier,
@Nullable final ClientSession clientSession) {
AsyncWriteThenReadOperationCursor<R> operation = operationSupplier.get();
return getExecutor(timeoutSettingsFunction.apply(operations))
.execute(operation, getReadConcern(), clientSession);
}

<R> Mono<R> createWriteOperationMono(final Function<Operations<?>, TimeoutSettings> timeoutSettingsFunction,
final Supplier<WriteOperation<R>> operationSupplier, @Nullable final ClientSession clientSession) {
return createWriteOperationMono(() -> timeoutSettingsFunction.apply(operations), operationSupplier, clientSession);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteOperation;
import com.mongodb.lang.Nullable;
Expand Down Expand Up @@ -54,6 +56,20 @@ <T> Mono<T> execute(ReadOperation<?, T> operation, ReadPreference readPreference
*/
<T> Mono<T> execute(WriteOperation<T> operation, ReadConcern readConcern, @Nullable ClientSession session);

/**
* Execute an operation that writes and then reads a cursor within a single read-write binding.
*
* <p>The binding is acquired once and used for both phases, avoiding the need to narrow an
* {@code AsyncReadBinding} to an {@code AsyncWriteBinding}.
*
* @param operation the write-then-read operation.
* @param readConcern the read concern
* @param session the session to associate this operation with
* @param <T> the document type returned by the cursor.
*/
<T> Mono<AsyncBatchCursor<T>> execute(AsyncWriteThenReadOperationCursor<T> operation,
ReadConcern readConcern, @Nullable ClientSession session);

/**
* Create a new OperationExecutor with a specific timeout settings
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext;
import com.mongodb.internal.observability.micrometer.Span;
import com.mongodb.internal.observability.micrometer.TracingManager;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.OperationHelper;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteOperation;
Expand Down Expand Up @@ -179,6 +181,51 @@ public <T> Mono<T> execute(final WriteOperation<T> operation, final ReadConcern
);
}

@Override
public <T> Mono<AsyncBatchCursor<T>> execute(final AsyncWriteThenReadOperationCursor<T> operation, final ReadConcern readConcern,
@Nullable final ClientSession session) {
isTrue("open", !mongoClient.getCluster().isClosed());
notNull("operation", operation);
notNull("readConcern", readConcern);

Comment on lines +184 to +190
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added notifyOperationInitiated call , also implementationf of notifyOperationInitiated now also checked instance of AsyncWriteThenRead

if (session != null) {
session.notifyOperationInitiated(operation);
}

return Mono.from(subscriber ->
clientSessionHelper.withClientSession(session, this)
.flatMap(actualClientSession -> {
AsyncReadWriteBinding binding = getReadWriteBinding(primary(), actualClientSession, session == null);
RequestContext requestContext = getContext(subscriber);
OperationContext operationContext = getOperationContext(requestContext, actualClientSession, readConcern, operation.getCommandName())
.withSessionContext(new ClientSessionBinding.AsyncClientSessionContext(actualClientSession,
isImplicitSession(session), readConcern));
Span span = tracingManager.createOperationSpan(actualClientSession.getTransactionSpan(),
operationContext, operation.getCommandName(), operation.getNamespace());

return Mono.<AsyncBatchCursor<T>>create(sink -> operation.executeAsync(binding, operationContext, (result, t) -> {
try {
binding.release();
} finally {
if (t != null) {
Throwable exceptionToHandle = t instanceof MongoException
? OperationHelper.unwrap((MongoException) t) : t;
labelException(session, exceptionToHandle);
unpinServerAddressOnTransientTransactionError(session, exceptionToHandle);
if (span != null) {
span.error(t);
}
}
if (span != null) {
span.end();
}
sinkToCallback(sink).onResult(result, t);
}
}));
}).subscribe(subscriber)
);
}

@Override
public OperationExecutor withTimeoutSettings(final TimeoutSettings newTimeoutSettings) {
if (Objects.equals(timeoutSettings, newTimeoutSettings)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import com.mongodb.MongoNamespace;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.AsyncWriteBinding;
import com.mongodb.internal.binding.AsyncReadWriteBinding;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.ReadOperationCursor;
import com.mongodb.internal.operation.WriteOperation;

class VoidWriteOperationThenCursorReadOperation<T> implements ReadOperationCursorAsyncOnly<T> {
class VoidWriteOperationThenCursorReadOperation<T> implements AsyncWriteThenReadOperationCursor<T> {
private final WriteOperation<Void> writeOperation;
private final ReadOperationCursor<T> cursorReadOperation;

Expand All @@ -46,8 +46,9 @@ public MongoNamespace getNamespace() {
}

@Override
public void executeAsync(final AsyncReadBinding binding, final OperationContext operationContext, final SingleResultCallback<AsyncBatchCursor<T>> callback) {
writeOperation.executeAsync((AsyncWriteBinding) binding, operationContext, (result, t) -> {
public void executeAsync(final AsyncReadWriteBinding binding, final OperationContext operationContext,
final SingleResultCallback<AsyncBatchCursor<T>> callback) {
writeOperation.executeAsync(binding, operationContext, (result, t) -> {
if (t != null) {
callback.onResult(null, t);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.Sorts;
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.MapReduceStatistics;
import com.mongodb.internal.operation.MapReduceToCollectionOperation;
import com.mongodb.internal.operation.MapReduceWithInlineResultsOperation;
import com.mongodb.reactivestreams.client.MapReducePublisher;


Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed empty line

Comment thread
strogiyotec marked this conversation as resolved.
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonJavaScript;
Expand All @@ -39,6 +43,7 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

@SuppressWarnings({"rawtypes", "deprecation"})
Expand All @@ -48,6 +53,44 @@ public class MapReducePublisherImplTest extends TestHelper {
private static final String REDUCE_FUNCTION = "reduceFunction(){}";
private static final String FINALIZE_FUNCTION = "finalizeFunction(){}";

@DisplayName("Non-inline MapReduce routes through the write-then-read executor path")
@Test
void shouldRouteNonInlineMapReduceThroughWriteThenReadPath() {
configureBatchCursor();
TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor()));

MapReducePublisher<Document> publisher =
new MapReducePublisherImpl<>(null, createMongoOperationPublisher(executor),
MAP_FUNCTION, REDUCE_FUNCTION)
.collectionName(NAMESPACE.getCollectionName());

Flux.from(publisher).blockFirst();

AsyncWriteThenReadOperationCursor<?> op = executor.getWriteThenReadOperation();
assertNotNull(op, "expected a write-then-read operation");
assertEquals(NAMESPACE, op.getNamespace());
// Must not fall through to the plain read-operation path.
assertNull(executor.getReadOperation());
}


@DisplayName("Inline MapReduce still routes through the read-operation path")
@Test
void shouldRouteInlineMapReduceThroughReadOperationPath() {
configureBatchCursor();
TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor()));

MapReducePublisher<Document> publisher =
new MapReducePublisherImpl<>(null, createMongoOperationPublisher(executor),
MAP_FUNCTION, REDUCE_FUNCTION); // no collectionName -> inline

Flux.from(publisher).blockFirst();

assertNotNull(executor.getReadOperation());
assertNull(executor.getWriteThenReadOperation());
}


@DisplayName("Should build the expected MapReduceWithInlineResultsOperation")
@Test
void shouldBuildTheExpectedMapReduceWithInlineResultsOperation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.mongodb.internal.bulk.IndexRequest;
import com.mongodb.internal.bulk.WriteRequest;
import com.mongodb.internal.client.model.FindOptions;
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteOperation;
import com.mongodb.lang.NonNull;
Expand Down Expand Up @@ -93,7 +94,10 @@ public class TestHelper {

Mockito.lenient().doAnswer(invocation -> Mono.empty())
.when(executor)
.execute(any(), any(), any());
.execute(any(WriteOperation.class), any(), any());
Mockito.lenient().doAnswer(invocation -> Mono.empty())
.when(executor)
.execute(any(AsyncWriteThenReadOperationCursor.class), any(), any());
Mockito.lenient().doAnswer(invocation -> Mono.empty())
.when(executor)
.execute(any(), any(), any(), any());
Expand Down
Loading