Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSOT: Adjust timeouts and increase test coverage #1383

Merged
merged 20 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions driver-core/src/main/com/mongodb/internal/TimeoutContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static <T> T throwMongoTimeoutException(final String message) {
}

public static MongoOperationTimeoutException createMongoTimeoutException(final Throwable cause) {
return createMongoTimeoutException("Operation timed out: " + cause.getMessage(), cause);
return createMongoTimeoutException("Operation exceeded the timeout limit: " + cause.getMessage(), cause);
}

public static MongoOperationTimeoutException createMongoTimeoutException(final String message, final Throwable cause) {
Expand Down Expand Up @@ -173,7 +173,7 @@ public long timeoutOrAlternative(final long alternativeTimeoutMS) {
return timeout.call(MILLISECONDS,
() -> 0L,
(ms) -> ms,
() -> throwMongoTimeoutException("The operation timeout has expired."));
() -> throwMongoTimeoutException("The operation exceeded the timeout limit."));
}
}

Expand Down Expand Up @@ -252,7 +252,7 @@ public int getConnectTimeoutMs() {
return Math.toIntExact(Timeout.nullAsInfinite(timeout).call(MILLISECONDS,
() -> connectTimeoutMS,
(ms) -> connectTimeoutMS == 0 ? ms : Math.min(ms, connectTimeoutMS),
() -> throwMongoTimeoutException("The operation timeout has expired.")));
() -> throwMongoTimeoutException("The operation exceeded the timeout limit.")));
}

public void resetTimeout() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private void doAdvanceOrThrow(final Throwable attemptException,
*/
if (hasTimeoutMs() && !loopState.isLastIteration()) {
previouslyChosenException = createMongoTimeoutException(
"Retry attempt timed out.",
"Retry attempt exceeded the timeout limit.",
previouslyChosenException);
}
throw previouslyChosenException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void write(final List<ByteBuf> buffers, final OperationContext operationC
for (final ByteBuf cur : buffers) {
outputStream.write(cur.array(), 0, cur.limit());
if (operationContext.getTimeoutContext().hasExpired()) {
throwMongoTimeoutException("Timeout occurred during socket write.");
throwMongoTimeoutException("Socket write exceeded the timeout limit.");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,7 @@ public void writeAsync(final List<ByteBuf> buffers, final OperationContext opera
}

long writeTimeoutMS = operationContext.getTimeoutContext().getWriteTimeoutMS();
Optional<WriteTimeoutHandler> writeTimeoutHandler = writeTimeoutMS != NO_SCHEDULE_TIME
? Optional.of(new WriteTimeoutHandler(writeTimeoutMS, MILLISECONDS)) : Optional.empty();
writeTimeoutHandler.map(w -> channel.pipeline().addBefore("ChannelInboundHandlerAdapter", "WriteTimeoutHandler", w));

final Optional<WriteTimeoutHandler> writeTimeoutHandler = addWriteTimeoutHandler(writeTimeoutMS);
channel.writeAndFlush(composite).addListener((ChannelFutureListener) future -> {
writeTimeoutHandler.map(w -> channel.pipeline().remove(w));
if (!future.isSuccess()) {
Expand All @@ -267,6 +264,15 @@ public void writeAsync(final List<ByteBuf> buffers, final OperationContext opera
});
}

private Optional<WriteTimeoutHandler> addWriteTimeoutHandler(final long writeTimeoutMS) {
if (writeTimeoutMS != NO_SCHEDULE_TIME) {
WriteTimeoutHandler writeTimeoutHandler = new WriteTimeoutHandler(writeTimeoutMS, MILLISECONDS);
channel.pipeline().addBefore("ChannelInboundHandlerAdapter", "WriteTimeoutHandler", writeTimeoutHandler);
return Optional.of(writeTimeoutHandler);
}
return Optional.empty();
}

@Override
public void readAsync(final int numBytes, final OperationContext operationContext, final AsyncCompletionHandler<ByteBuf> handler) {
readAsync(numBytes, handler, operationContext.getTimeoutContext().getReadTimeoutMS());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ public List<CommandFailedEvent> getCommandFailedEvents() {
return getEvents(CommandFailedEvent.class, Integer.MAX_VALUE);
}

public List<CommandFailedEvent> getCommandFailedEvents(final String commandName) {
return getEvents(CommandFailedEvent.class,
commandEvent -> commandEvent.getCommandName().equals(commandName),
Integer.MAX_VALUE);
}

public List<CommandStartedEvent> getCommandStartedEvents() {
return getEvents(CommandStartedEvent.class, Integer.MAX_VALUE);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"description": "timeoutMS behaves correctly for non-tailable cursors",
"comment": "Manually reduced blockTimeMS for tests to pass in serverless",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wowzers

"schemaVersion": "1.9",
"runOnRequirements": [
{
Expand Down Expand Up @@ -143,7 +144,7 @@
"getMore"
],
"blockConnection": true,
"blockTimeMS": 125
"blockTimeMS": 101
}
}
}
Expand Down Expand Up @@ -221,7 +222,7 @@
"getMore"
],
"blockConnection": true,
"blockTimeMS": 150
"blockTimeMS": 101
}
}
}
Expand Down Expand Up @@ -355,7 +356,7 @@
"getMore"
],
"blockConnection": true,
"blockTimeMS": 125
"blockTimeMS": 101
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ final class RetryStateTest {

private static final TimeoutContext TIMEOUT_CONTEXT_INFINITE_GLOBAL_TIMEOUT = new TimeoutContext(new TimeoutSettings(0L, 0L,
0L, 0L, 0L));
private static final String EXPECTED_TIMEOUT_MESSAGE = "Retry attempt timed out.";
private static final String EXPECTED_TIMEOUT_MESSAGE = "Retry attempt exceeded the timeout limit.";

static Stream<Arguments> infiniteTimeout() {
return Stream.of(
Expand Down Expand Up @@ -302,7 +302,7 @@ void advanceThrowTimeoutExceptionWhenTransformerSwallowOriginalTimeoutException(
(rs, e) -> false));

Assertions.assertNotEquals(actualTimeoutException, expectedTimeoutException);
Assertions.assertEquals("Retry attempt timed out.", actualTimeoutException.getMessage());
Assertions.assertEquals(EXPECTED_TIMEOUT_MESSAGE, actualTimeoutException.getMessage());
Assertions.assertEquals(previousAttemptException, actualTimeoutException.getCause(),
"Retry timeout exception should have a cause if transformer returned non-timeout exception.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ public Publisher<Void> commitTransaction() {
.execute(
new CommitTransactionOperation(writeConcern, alreadyCommitted)
.recoveryToken(getRecoveryToken()), readConcern, this)
.doOnSuccess(ignored -> setTimeoutContext(null))
.doOnTerminate(() -> {
commitInProgress = false;
transactionState = TransactionState.COMMITTED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private static Mono<MongoOperationTimeoutException> createMonoTimer(final @Nulla

private static Mono<MongoOperationTimeoutException> createTimeoutMonoError() {
return Mono.error(TimeoutContext.createMongoTimeoutException(
"GridFS timed out waiting for data from provided source Publisher"));
"GridFS waiting for data from the source Publisher exceeded the timeout limit."));
}

private Mono<InsertOneResult> createSaveFileDataMono(final AtomicBoolean terminated,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.test.StepVerifier;
Expand Down Expand Up @@ -107,7 +105,6 @@ protected boolean isAsync() {
return true;
}

@Tag("setsFailPoint")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@DisplayName("6. GridFS Upload - uploads via openUploadStream can be timed out")
@Test
@Override
Expand All @@ -117,7 +114,7 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() {

//given
collectionHelper.runAdminCommand("{"
+ " configureFailPoint: \"failCommand\","
+ " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\","
+ " mode: { times: 1 },"
+ " data: {"
+ " failCommands: [\"insert\"],"
Expand Down Expand Up @@ -164,7 +161,6 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() {
}
}

@Tag("setsFailPoint")
@DisplayName("6. GridFS Upload - Aborting an upload stream can be timed out")
@Test
@Override
Expand All @@ -177,7 +173,7 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I
Hooks.onErrorDropped(droppedErrorFuture::complete);

collectionHelper.runAdminCommand("{"
+ " configureFailPoint: \"failCommand\","
+ " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\","
+ " mode: { times: 1 },"
+ " data: {"
+ " failCommands: [\"delete\"],"
Expand Down Expand Up @@ -227,7 +223,6 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I
/**
* Not a prose spec test. However, it is additional test case for better coverage.
*/
@Tag("setsFailPoint")
@DisplayName("TimeoutMS applies to full resume attempt in a next call")
@Test
public void testTimeoutMSAppliesToFullResumeAttemptInNextCall() {
Expand Down Expand Up @@ -283,7 +278,6 @@ public void testTimeoutMSAppliesToFullResumeAttemptInNextCall() {
/**
* Not a prose spec test. However, it is additional test case for better coverage.
*/
@Tag("setsFailPoint")
@DisplayName("TimeoutMS applied to initial aggregate")
@Test
public void testTimeoutMSAppliedToInitialAggregate() {
Expand Down Expand Up @@ -332,7 +326,6 @@ public void testTimeoutMSAppliedToInitialAggregate() {
/**
* Not a prose spec test. However, it is additional test case for better coverage.
*/
@Tag("setsFailPoint")
@DisplayName("TimeoutMS is refreshed for getMore if maxAwaitTimeMS is not set")
@Test
public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() {
Expand Down Expand Up @@ -402,7 +395,6 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() {
/**
* Not a prose spec test. However, it is additional test case for better coverage.
*/
@Tag("setsFailPoint")
@DisplayName("TimeoutMS is refreshed for getMore if maxAwaitTimeMS is set")
@Test
public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() {
Expand All @@ -420,7 +412,8 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() {
.timeout(rtt + 300, TimeUnit.MILLISECONDS))) {

MongoCollection<Document> collection = client.getDatabase(namespace.getDatabaseName())
.getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary());
.getCollection(namespace.getCollectionName())
.withReadPreference(ReadPreference.primary());

collectionHelper.runAdminCommand("{"
+ " configureFailPoint: \"failCommand\","
Expand Down Expand Up @@ -449,6 +442,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() {
.expectNextCount(2)
.thenAwait(Duration.ofMillis(600))
.thenRequest(2)
.expectNextCount(2)
.thenCancel()
.verify();

Expand All @@ -463,7 +457,6 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() {
/**
* Not a prose spec test. However, it is additional test case for better coverage.
*/
@Tag("setsFailPoint")
@DisplayName("TimeoutMS is honored for next operation when several getMore executed internally")
@Test
public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInternally() {
Expand Down Expand Up @@ -528,8 +521,8 @@ public void setUp() {

@Override
@AfterEach
public void tearDown(final TestInfo info) {
super.tearDown(info);
public void tearDown() {
super.tearDown();
SyncMongoClient.disableSleep();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import static com.mongodb.client.ClientSideOperationTimeoutTest.checkSkipCSOTTest;
import static com.mongodb.client.ClientSideOperationTimeoutTest.racyTestAssertion;
import static com.mongodb.client.ClientSideOperationTimeoutTest.skipOperationTimeoutTests;
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.disableSleep;
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.enableSleepAfterCursorError;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static org.junit.Assume.assumeFalse;

// See https://github.com/mongodb/specifications/tree/master/source/client-side-operation-timeout/tests
Expand All @@ -53,7 +54,7 @@ public ClientSideOperationTimeoutTest(final String fileDescription, final String
final BsonArray initialData, final BsonDocument definition) {
super(schemaVersion, runOnRequirements, entities, initialData, definition);
this.testDescription = testDescription;
// Time sensitive - cannot just create a cursor with publishers
skipOperationTimeoutTests(fileDescription, testDescription);

assumeFalse("No iterateOnce support. There is alternative prose test for it.",
testDescription.equals("timeoutMS is refreshed for getMore if maxAwaitTimeMS is not set"));
Expand All @@ -74,18 +75,20 @@ The Reactive Streams specification prevents us from allowing a subsequent next c
assumeFalse(testDescription.endsWith("createChangeStream on client"));
assumeFalse(testDescription.endsWith("createChangeStream on database"));
assumeFalse(testDescription.endsWith("createChangeStream on collection"));
assumeFalse("TODO (CSOT) - JAVA-5104", fileDescription.equals("timeoutMS behaves correctly during command execution")
&& testDescription.equals("command is not sent if RTT is greater than timeoutMS"));

// No withTransaction support
assumeFalse(fileDescription.contains("withTransaction") || testDescription.contains("withTransaction"));

checkSkipCSOTTest(fileDescription, testDescription);

if (testDescription.equals("timeoutMS is refreshed for close")) {
enableSleepAfterCursorError(256);
}

/*
* The test is occasionally racy. The "killCursors" command may appear as an additional event. This is unexpected in unified tests,
* but anticipated in reactive streams because an operation timeout error triggers the closure of the stream/publisher.
*/
ignoreExtraCommandEvents(testDescription.contains("timeoutMS is refreshed for getMore - failure"));

Hooks.onOperatorDebug();
Hooks.onErrorDropped(atomicReferenceThrowable::set);
}
Expand Down Expand Up @@ -128,6 +131,14 @@ public void cleanUp() {
Hooks.resetOnErrorDropped();
}

public static boolean racyTestAssertion(final String testDescription, final AssertionError e) {
return RACY_GET_MORE_TESTS.contains(testDescription) && e.getMessage().startsWith("Number of events must be the same");
}

private static final List<String> RACY_GET_MORE_TESTS = asList(
"remaining timeoutMS applied to getMore if timeoutMode is cursor_lifetime",
"remaining timeoutMS applied to getMore if timeoutMode is unset");

private void assertNoDroppedError(final String message) {
Throwable droppedError = atomicReferenceThrowable.get();
if (droppedError != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void shouldTimeoutWhenSourcePublisherCompletionExceedsOverallOperationTimeout()

Throwable throwable = onErrorEvents.get(0);
assertEquals(MongoOperationTimeoutException.class, throwable.getClass());
assertEquals("GridFS timed out waiting for data from provided source Publisher", throwable.getMessage());
assertEquals("GridFS waiting for data from the source Publisher exceeded the timeout limit.", throwable.getMessage());

//assert no chunk has been inserted as we have not sent any data from source publisher.
for (CommandEvent event : commandListener.getEvents()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@

final class GridFSBucketImpl implements GridFSBucket {
private static final int DEFAULT_CHUNKSIZE_BYTES = 255 * 1024;
private static final String TIMEOUT_MESSAGE = "GridFS operation timed out";
private static final String TIMEOUT_MESSAGE = "GridFS operation exceeded the timeout limit.";
private final String bucketName;
private final int chunkSizeBytes;
private final MongoCollection<GridFSFile> filesCollection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import static java.lang.String.format;

class GridFSDownloadStreamImpl extends GridFSDownloadStream {
private static final String TIMEOUT_MESSAGE = "The GridFS download stream has timed out";
private static final String TIMEOUT_MESSAGE = "The GridFS download stream exceeded the timeout limit.";
private final ClientSession clientSession;
private final GridFSFile fileInfo;
private final MongoCollection<BsonDocument> chunksCollection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static com.mongodb.internal.Locks.withInterruptibleLock;

final class GridFSUploadStreamImpl extends GridFSUploadStream {
public static final String TIMEOUT_MESSAGE = "The GridFS upload stream has timed out";
public static final String TIMEOUT_MESSAGE = "The GridFS upload stream exceeded the timeout limit.";
private final ClientSession clientSession;
private final MongoCollection<GridFSFile> filesCollection;
private final MongoCollection<BsonDocument> chunksCollection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private void setSocketSoTimeoutToOperationTimeout() throws SocketException {
throw new AssertionError("operationTimeout cannot be infinite");
},
(ms) -> socket.setSoTimeout(Math.toIntExact(ms)),
() -> TimeoutContext.throwMongoTimeoutException("Timeout has expired while reading from KMS server"));
() -> TimeoutContext.throwMongoTimeoutException("Reading from KMS server exceeded the timeout limit."));
}

@Override
Expand Down
Loading