diff --git a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java
index 55684298e29..64f0194ce45 100644
--- a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java
+++ b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java
@@ -209,7 +209,7 @@ public int hashCode() {
}
@Nullable
- private static Timeout calculateTimeout(@Nullable final Long timeoutMS) {
+ public static Timeout calculateTimeout(@Nullable final Long timeoutMS) {
if (timeoutMS != null) {
return timeoutMS == 0 ? Timeout.infinite() : Timeout.expiresIn(timeoutMS, MILLISECONDS);
}
diff --git a/driver-core/src/test/functional/com/mongodb/client/CommandMonitoringTestHelper.java b/driver-core/src/test/functional/com/mongodb/client/CommandMonitoringTestHelper.java
index 8ba3a5b3851..3a99277ef65 100644
--- a/driver-core/src/test/functional/com/mongodb/client/CommandMonitoringTestHelper.java
+++ b/driver-core/src/test/functional/com/mongodb/client/CommandMonitoringTestHelper.java
@@ -120,11 +120,11 @@ static boolean isWriteCommand(final String commandName) {
return asList("insert", "update", "delete").contains(commandName);
}
- public static void assertEventsEquality(final List expectedEvents, final List events) {
+ public static void assertEventsEquality(final List expectedEvents, final List extends CommandEvent> events) {
assertEventsEquality(expectedEvents, events, null);
}
- public static void assertEventsEquality(final List expectedEvents, final List events,
+ public static void assertEventsEquality(final List expectedEvents, final List extends CommandEvent> events,
@Nullable final Map lsidMap) {
assertEquals(expectedEvents.size(), events.size());
diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java
index 5c879f52595..c02065363f9 100644
--- a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java
+++ b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java
@@ -144,24 +144,27 @@ public CommandFailedEvent getCommandFailedEvent(final String commandName) {
.orElseThrow(() -> new IllegalArgumentException(commandName + " not found in command failed event list"));
}
- public List getCommandStartedEvents() {
+ public List getCommandStartedEvents() {
return getEvents(CommandStartedEvent.class, Integer.MAX_VALUE);
}
- public List getCommandSucceededEvents() {
+ public List getCommandSucceededEvents() {
return getEvents(CommandSucceededEvent.class, Integer.MAX_VALUE);
}
- private List getEvents(final Class extends CommandEvent> type, final int maxEvents) {
+ private List getEvents(final Class type, final int maxEvents) {
lock.lock();
try {
- return getEvents().stream().filter(e -> e.getClass() == type).limit(maxEvents).collect(Collectors.toList());
+ return getEvents().stream()
+ .filter(e -> e.getClass() == type)
+ .map(type::cast)
+ .limit(maxEvents).collect(Collectors.toList());
} finally {
lock.unlock();
}
}
- public List waitForStartedEvents(final int numEvents) {
+ public List waitForStartedEvents(final int numEvents) {
lock.lock();
try {
while (!hasCompletedEvents(numEvents)) {
diff --git a/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/gridfs-download.json b/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/gridfs-download.json
index 8542f69e898..3cf4a285fa6 100644
--- a/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/gridfs-download.json
+++ b/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/gridfs-download.json
@@ -299,7 +299,7 @@
"find"
],
"blockConnection": true,
- "blockTimeMS": 50
+ "blockTimeMS": 40
}
}
}
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/gridfs/GridFSFindPublisher.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/gridfs/GridFSFindPublisher.java
index dbc4b645c5d..75f7a7cac10 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/gridfs/GridFSFindPublisher.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/gridfs/GridFSFindPublisher.java
@@ -16,12 +16,9 @@
package com.mongodb.reactivestreams.client.gridfs;
-import com.mongodb.client.cursor.TimeoutMode;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.client.model.Collation;
import com.mongodb.lang.Nullable;
-import com.mongodb.reactivestreams.client.MongoCollection;
-import com.mongodb.reactivestreams.client.MongoDatabase;
import org.bson.conversions.Bson;
import org.reactivestreams.Publisher;
@@ -127,17 +124,4 @@ public interface GridFSFindPublisher extends Publisher {
* @mongodb.driver.manual reference/method/cursor.batchSize/#cursor.batchSize Batch Size
*/
GridFSFindPublisher batchSize(int batchSize);
-
- /**
- * Sets the timeoutMode for the cursor.
- *
- *
- * Requires the {@code timeout} to be set, either in the {@link com.mongodb.MongoClientSettings},
- * via {@link MongoDatabase} or via {@link MongoCollection}
- *
- * @param timeoutMode the timeout mode
- * @return this
- * @since CSOT
- */
- GridFSFindPublisher timeoutMode(TimeoutMode timeoutMode);
}
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSBucketImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSBucketImpl.java
index d92f68154dc..87f0217289a 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSBucketImpl.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSBucketImpl.java
@@ -72,7 +72,7 @@ public GridFSBucketImpl(final MongoDatabase database, final String bucketName) {
getChunksCollection(database, bucketName));
}
- GridFSBucketImpl(final String bucketName, final int chunkSizeBytes, final MongoCollection filesCollection,
+ private GridFSBucketImpl(final String bucketName, final int chunkSizeBytes, final MongoCollection filesCollection,
final MongoCollection chunksCollection) {
this.bucketName = notNull("bucketName", bucketName);
this.chunkSizeBytes = chunkSizeBytes;
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSFindPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSFindPublisherImpl.java
index c917e64e137..6f1a501a33f 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSFindPublisherImpl.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSFindPublisherImpl.java
@@ -16,7 +16,6 @@
package com.mongodb.reactivestreams.client.internal.gridfs;
-import com.mongodb.client.cursor.TimeoutMode;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.client.model.Collation;
import com.mongodb.lang.Nullable;
@@ -94,12 +93,6 @@ public GridFSFindPublisher batchSize(final int batchSize) {
return this;
}
- @Override
- public GridFSFindPublisher timeoutMode(final TimeoutMode timeoutMode) {
- wrapped.timeoutMode(timeoutMode);
- return this;
- }
-
@Override
public void subscribe(final Subscriber super GridFSFile> s) {
wrapped.subscribe(s);
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java
index 0cb2e49f572..0d5d509a47c 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java
@@ -19,17 +19,44 @@
import com.mongodb.MongoClientSettings;
import com.mongodb.client.AbstractClientSideOperationsTimeoutProseTest;
import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.gridfs.GridFSBucket;
+import com.mongodb.reactivestreams.client.gridfs.GridFSBuckets;
+import com.mongodb.reactivestreams.client.syncadapter.SyncGridFSBucket;
import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient;
+import org.junit.jupiter.api.Disabled;
/**
* See https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.rst#prose-tests
*/
public final class ClientSideOperationTimeoutProseTest extends AbstractClientSideOperationsTimeoutProseTest {
+ private com.mongodb.reactivestreams.client.MongoClient wrapped;
@Override
protected MongoClient createMongoClient(final MongoClientSettings mongoClientSettings) {
- return new SyncMongoClient(MongoClients.create(mongoClientSettings));
+ wrapped = MongoClients.create(mongoClientSettings);
+ return new SyncMongoClient(wrapped);
+ }
+
+ @Override
+ protected GridFSBucket createGridFsBucket(final MongoDatabase mongoDatabase, final String bucketName) {
+ return new SyncGridFSBucket(GridFSBuckets.create(wrapped.getDatabase(mongoDatabase.getName()), bucketName));
+ }
+
+ @Override
+ @Disabled("TODO (CSOT) - JAVA-4057")
+ public void testGridFSUploadViaOpenUploadStreamTimeout() {
+ }
+
+ @Disabled("TODO (CSOT) - JAVA-4057")
+ @Override
+ public void testAbortingGridFsUploadStreamTimeout() {
+ }
+
+ @Disabled("TODO (CSOT) - JAVA-4057")
+ @Override
+ public void testGridFsDownloadStreamTimeout() {
}
@Override
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ReadConcernTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ReadConcernTest.java
index 15b1bc7f5cf..eef7cfbe9fe 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ReadConcernTest.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ReadConcernTest.java
@@ -17,7 +17,6 @@
package com.mongodb.reactivestreams.client;
import com.mongodb.ReadConcern;
-import com.mongodb.event.CommandEvent;
import com.mongodb.event.CommandStartedEvent;
import com.mongodb.internal.connection.TestCommandListener;
import org.bson.BsonDocument;
@@ -65,7 +64,7 @@ public void shouldIncludeReadConcernInCommand() throws InterruptedException {
.find())
.block(TIMEOUT_DURATION);
- List events = commandListener.getCommandStartedEvents();
+ List events = commandListener.getCommandStartedEvents();
BsonDocument commandDocument = new BsonDocument("find", new BsonString("test"))
.append("readConcern", ReadConcern.LOCAL.asDocument())
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/internal/BatchCursorFluxTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/internal/BatchCursorFluxTest.java
index 410dfd02fc4..ebbd2069f70 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/internal/BatchCursorFluxTest.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/internal/BatchCursorFluxTest.java
@@ -373,7 +373,7 @@ public void testBatchCursorReportsCursorErrors() {
BsonDocument getMoreCommand = commandListener.getCommandStartedEvents().stream()
.filter(e -> e.getCommandName().equals("getMore"))
- .map(e -> ((CommandStartedEvent) e).getCommand())
+ .map(CommandStartedEvent::getCommand)
.findFirst()
.get();
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncGridFSBucket.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncGridFSBucket.java
index a09b4ffbec3..5ed29caf4a0 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncGridFSBucket.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncGridFSBucket.java
@@ -42,6 +42,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import static com.mongodb.ClusterFixture.TIMEOUT_DURATION;
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.CONTEXT;
@@ -79,6 +80,11 @@ public ReadConcern getReadConcern() {
return wrapped.getReadConcern();
}
+ @Override
+ public Long getTimeout(final TimeUnit timeUnit) {
+ throw new UnsupportedOperationException("Not implemented yet!");
+ }
+
@Override
public GridFSBucket withChunkSizeBytes(final int chunkSizeBytes) {
return new SyncGridFSBucket(wrapped.withChunkSizeBytes(chunkSizeBytes));
@@ -99,6 +105,11 @@ public GridFSBucket withReadConcern(final ReadConcern readConcern) {
return new SyncGridFSBucket(wrapped.withReadConcern(readConcern));
}
+ @Override
+ public GridFSBucket withTimeout(final long timeout, final TimeUnit timeUnit) {
+ throw new UnsupportedOperationException("Not implemented yet!");
+ }
+
@Override
public GridFSUploadStream openUploadStream(final String filename) {
return openUploadStream(filename, new GridFSUploadOptions());
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java
index d3eba96c5c6..d79b17d02aa 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ClientSideOperationTimeoutTest.java
@@ -57,6 +57,7 @@ public ClientSideOperationTimeoutTest(final String fileDescription, final String
assumeFalse(testDescription.endsWith("createChangeStream on client"));
assumeFalse(testDescription.endsWith("createChangeStream on database"));
assumeFalse(testDescription.endsWith("createChangeStream on collection"));
+ assumeFalse("TODO (CSOT) - JAVA-4057", fileDescription.contains("GridFS"));
checkSkipCSOTTest(fileDescription, testDescription);
if (testDescription.equals("timeoutMS is refreshed for close")) {
diff --git a/driver-scala/src/main/scala/org/mongodb/scala/gridfs/GridFSFindObservable.scala b/driver-scala/src/main/scala/org/mongodb/scala/gridfs/GridFSFindObservable.scala
index 58f1c200242..fdbea9add70 100644
--- a/driver-scala/src/main/scala/org/mongodb/scala/gridfs/GridFSFindObservable.scala
+++ b/driver-scala/src/main/scala/org/mongodb/scala/gridfs/GridFSFindObservable.scala
@@ -121,27 +121,6 @@ case class GridFSFindObservable(private val wrapped: GridFSFindPublisher) extend
this
}
- /**
- * Sets the timeoutMode for the cursor.
- *
- * Requires the `timeout` to be set, either in the `MongoClientSettings`,
- * via `MongoDatabase` or via `MongoCollection`
- *
- * If the `timeout` is set then:
- *
- * - For non-tailable cursors, the default value of timeoutMode is `TimeoutMode.CURSOR_LIFETIME`
- * - For tailable cursors, the default value of timeoutMode is `TimeoutMode.ITERATION` and its an error
- * to configure it as: `TimeoutMode.CURSOR_LIFETIME`
- *
- * @param timeoutMode the timeout mode
- * @return this
- * @since CSOT
- */
- def timeoutMode(timeoutMode: TimeoutMode): GridFSFindObservable = {
- wrapped.timeoutMode(timeoutMode)
- this
- }
-
/**
* Helper to return a single observable limited to the first result.
*
diff --git a/driver-sync/src/main/com/mongodb/client/gridfs/CollectionTimeoutHelper.java b/driver-sync/src/main/com/mongodb/client/gridfs/CollectionTimeoutHelper.java
new file mode 100644
index 00000000000..9df56290b11
--- /dev/null
+++ b/driver-sync/src/main/com/mongodb/client/gridfs/CollectionTimeoutHelper.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.client.gridfs;
+
+import com.mongodb.MongoOperationTimeoutException;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.internal.time.Timeout;
+import com.mongodb.lang.Nullable;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+final class CollectionTimeoutHelper {
+ private CollectionTimeoutHelper(){
+ //NOP
+ }
+
+ public static MongoCollection collectionWithTimeout(final MongoCollection collection,
+ final String message,
+ @Nullable final Timeout timeout) {
+ if (timeout != null && !timeout.isInfinite()) {
+ long remainingMs = timeout.remaining(MILLISECONDS);
+ if (timeout.hasExpired()) {
+ // TODO (CSOT) - JAVA-5248 Update to MongoOperationTimeoutException
+ throw new MongoOperationTimeoutException(message);
+ }
+ return collection.withTimeout(remainingMs, MILLISECONDS);
+ }
+ return collection;
+ }
+}
diff --git a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSBucket.java b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSBucket.java
index c32f114844c..d79ae468db7 100644
--- a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSBucket.java
+++ b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSBucket.java
@@ -21,14 +21,17 @@
import com.mongodb.WriteConcern;
import com.mongodb.annotations.ThreadSafe;
import com.mongodb.client.ClientSession;
+import com.mongodb.client.MongoDatabase;
import com.mongodb.client.gridfs.model.GridFSDownloadOptions;
import com.mongodb.client.gridfs.model.GridFSUploadOptions;
+import com.mongodb.lang.Nullable;
import org.bson.BsonValue;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.concurrent.TimeUnit;
/**
* Represents a GridFS Bucket
@@ -76,6 +79,36 @@ public interface GridFSBucket {
*/
ReadConcern getReadConcern();
+ /**
+ * The time limit for the full execution of an operation.
+ *
+ * If not null the following deprecated options will be ignored:
+ * {@code waitQueueTimeoutMS}, {@code socketTimeoutMS}, {@code wTimeoutMS}, {@code maxTimeMS} and {@code maxCommitTimeMS}
+ *
+ *
+ * - {@code null} means that the timeout mechanism for operations will defer to using:
+ *
+ * - {@code waitQueueTimeoutMS}: The maximum wait time in milliseconds that a thread may wait for a connection to become
+ * available
+ * - {@code socketTimeoutMS}: How long a send or receive on a socket can take before timing out.
+ * - {@code wTimeoutMS}: How long the server will wait for the write concern to be fulfilled before timing out.
+ * - {@code maxTimeMS}: The cumulative time limit for processing operations on a cursor.
+ * See: cursor.maxTimeMS.
+ * - {@code maxCommitTimeMS}: The maximum amount of time to allow a single {@code commitTransaction} command to execute.
+ * See: {@link com.mongodb.TransactionOptions#getMaxCommitTime}.
+ *
+ *
+ * - {@code 0} means infinite timeout.
+ * - {@code > 0} The time limit to use for the full execution of an operation.
+ *
+ *
+ * @param timeUnit the time unit
+ * @return the timeout in the given time unit
+ * @since 4.x
+ */
+ @Nullable
+ Long getTimeout(TimeUnit timeUnit);
+
/**
* Create a new GridFSBucket instance with a new chunk size in bytes.
*
@@ -111,6 +144,22 @@ public interface GridFSBucket {
*/
GridFSBucket withReadConcern(ReadConcern readConcern);
+ /**
+ * Create a new GridFSBucket instance with the set time limit for the full execution of an operation.
+ *
+ *
+ * - {@code 0} means infinite timeout.
+ * - {@code > 0} The time limit to use for the full execution of an operation.
+ *
+ *
+ * @param timeout the timeout, which must be greater than or equal to 0
+ * @param timeUnit the time unit
+ * @return a new GridFSBucket instance with the set time limit for the full execution of an operation
+ * @since 4.x
+ * @see #getTimeout
+ */
+ GridFSBucket withTimeout(long timeout, TimeUnit timeUnit);
+
/**
* Opens a Stream that the application can write the contents of the file to.
*
@@ -296,6 +345,10 @@ public interface GridFSBucket {
* chunks have been uploaded, it creates a files collection document for {@code filename} in the files collection.
*
*
+ Note: When this {@link GridFSBucket} is set with a operation timeout (via timeout inherited from {@link MongoDatabase}
+ * settings or {@link #withTimeout(long, TimeUnit)}), timeout breaches may occur due to the {@link InputStream}
+ * lacking inherent read timeout support, which might extend the operation beyond the specified timeout limit.
+ *
* @param id the custom id value of the file
* @param filename the filename for the stream
* @param source the Stream providing the file data
@@ -310,6 +363,10 @@ public interface GridFSBucket {
* chunks have been uploaded, it creates a files collection document for {@code filename} in the files collection.
*
*
+ Note: When this {@link GridFSBucket} is set with a operation timeout (via timeout inherited from {@link MongoDatabase}
+ * settings or {@link #withTimeout(long, TimeUnit)}), timeout breaches may occur due to the {@link InputStream}
+ * lacking inherent read timeout support, which might extend the operation beyond the specified timeout limit.
+ *
* @param id the custom id value of the file
* @param filename the filename for the stream
* @param source the Stream providing the file data
@@ -325,6 +382,10 @@ public interface GridFSBucket {
* chunks have been uploaded, it creates a files collection document for {@code filename} in the files collection.
*
*
+ Note: When this {@link GridFSBucket} is set with a operation timeout (via timeout inherited from {@link MongoDatabase}
+ * settings or {@link #withTimeout(long, TimeUnit)}), timeout breaches may occur due to the {@link InputStream}
+ * lacking inherent read timeout support, which might extend the operation beyond the specified timeout limit.
+ *
* @param clientSession the client session with which to associate this operation
* @param filename the filename for the stream
* @param source the Stream providing the file data
@@ -341,6 +402,10 @@ public interface GridFSBucket {
* chunks have been uploaded, it creates a files collection document for {@code filename} in the files collection.
*
*
+ Note: When this {@link GridFSBucket} is set with a operation timeout (via timeout inherited from {@link MongoDatabase}
+ * settings or {@link #withTimeout(long, TimeUnit)}), timeout breaches may occur due to the {@link InputStream}
+ * lacking inherent read timeout support, which might extend the operation beyond the specified timeout limit.
+ *
* @param clientSession the client session with which to associate this operation
* @param filename the filename for the stream
* @param source the Stream providing the file data
@@ -358,6 +423,10 @@ public interface GridFSBucket {
* chunks have been uploaded, it creates a files collection document for {@code filename} in the files collection.
*
*
+ Note: When this {@link GridFSBucket} is set with a operation timeout (via timeout inherited from {@link MongoDatabase}
+ * settings or {@link #withTimeout(long, TimeUnit)}), timeout breaches may occur due to the {@link InputStream}
+ * lacking inherent read timeout support, which might extend the operation beyond the specified timeout limit.
+ *
* @param clientSession the client session with which to associate this operation
* @param id the custom id value of the file
* @param filename the filename for the stream
@@ -374,6 +443,10 @@ public interface GridFSBucket {
* chunks have been uploaded, it creates a files collection document for {@code filename} in the files collection.
*
*
+ Note: When this {@link GridFSBucket} is set with a operation timeout (via timeout inherited from {@link MongoDatabase}
+ * settings or {@link #withTimeout(long, TimeUnit)}), timeout breaches may occur due to the {@link InputStream}
+ * lacking inherent read timeout support, which might extend the operation beyond the specified timeout limit.
+ *
* @param clientSession the client session with which to associate this operation
* @param id the custom id value of the file
* @param filename the filename for the stream
diff --git a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSBucketImpl.java b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSBucketImpl.java
index f365bd2980a..ca6ca3d30c5 100644
--- a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSBucketImpl.java
+++ b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSBucketImpl.java
@@ -17,6 +17,7 @@
package com.mongodb.client.gridfs;
import com.mongodb.MongoClientSettings;
+import com.mongodb.MongoOperationTimeoutException;
import com.mongodb.MongoGridFSException;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
@@ -26,12 +27,17 @@
import com.mongodb.client.ListIndexesIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.cursor.TimeoutMode;
import com.mongodb.client.gridfs.model.GridFSDownloadOptions;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.client.gridfs.model.GridFSUploadOptions;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
+import com.mongodb.internal.TimeoutContext;
+
+import com.mongodb.internal.VisibleForTesting;
+import com.mongodb.internal.time.Timeout;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
import org.bson.BsonObjectId;
@@ -46,14 +52,18 @@
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static com.mongodb.ReadPreference.primary;
+import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;
import static java.lang.String.format;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
final class GridFSBucketImpl implements GridFSBucket {
private static final int DEFAULT_CHUNKSIZE_BYTES = 255 * 1024;
+ private static final String TIMEOUT_MESSAGE = "GridFS operation timed out";
private final String bucketName;
private final int chunkSizeBytes;
private final MongoCollection filesCollection;
@@ -70,6 +80,7 @@ final class GridFSBucketImpl implements GridFSBucket {
getChunksCollection(database, bucketName));
}
+ @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
GridFSBucketImpl(final String bucketName, final int chunkSizeBytes, final MongoCollection filesCollection,
final MongoCollection chunksCollection) {
this.bucketName = notNull("bucketName", bucketName);
@@ -103,6 +114,11 @@ public ReadConcern getReadConcern() {
return filesCollection.getReadConcern();
}
+ @Override
+ public Long getTimeout(final TimeUnit timeUnit) {
+ return filesCollection.getTimeout(timeUnit);
+ }
+
@Override
public GridFSBucket withChunkSizeBytes(final int chunkSizeBytes) {
return new GridFSBucketImpl(bucketName, chunkSizeBytes, filesCollection, chunksCollection);
@@ -126,6 +142,14 @@ public GridFSBucket withReadConcern(final ReadConcern readConcern) {
chunksCollection.withReadConcern(readConcern));
}
+ @Override
+ public GridFSBucket withTimeout(final long timeout, final TimeUnit timeUnit) {
+ isTrueArgument("timeout >= 0", timeout >= 0);
+ notNull("timeUnit", timeUnit);
+ return new GridFSBucketImpl(bucketName, chunkSizeBytes, filesCollection.withTimeout(timeout, timeUnit),
+ chunksCollection.withTimeout(timeout, timeUnit));
+ }
+
@Override
public GridFSUploadStream openUploadStream(final String filename) {
return openUploadStream(new BsonObjectId(), filename);
@@ -176,12 +200,14 @@ public GridFSUploadStream openUploadStream(final ClientSession clientSession, fi
private GridFSUploadStream createGridFSUploadStream(@Nullable final ClientSession clientSession, final BsonValue id,
final String filename, final GridFSUploadOptions options) {
+ Timeout operationTimeout = startTimeout();
notNull("options", options);
Integer chunkSizeBytes = options.getChunkSizeBytes();
int chunkSize = chunkSizeBytes == null ? this.chunkSizeBytes : chunkSizeBytes;
- checkCreateIndex(clientSession);
- return new GridFSUploadStreamImpl(clientSession, filesCollection, chunksCollection, id, filename, chunkSize,
- options.getMetadata());
+ checkCreateIndex(clientSession, operationTimeout);
+ return new GridFSUploadStreamImpl(clientSession, filesCollection,
+ chunksCollection, id, filename, chunkSize,
+ options.getMetadata(), operationTimeout);
}
@Override
@@ -257,7 +283,10 @@ public GridFSDownloadStream openDownloadStream(final ObjectId id) {
@Override
public GridFSDownloadStream openDownloadStream(final BsonValue id) {
- return createGridFSDownloadStream(null, getFileInfoById(null, id));
+ Timeout operationTimeout = startTimeout();
+
+ GridFSFile fileInfo = getFileInfoById(null, id, operationTimeout);
+ return createGridFSDownloadStream(null, fileInfo, operationTimeout);
}
@Override
@@ -267,7 +296,9 @@ public GridFSDownloadStream openDownloadStream(final String filename) {
@Override
public GridFSDownloadStream openDownloadStream(final String filename, final GridFSDownloadOptions options) {
- return createGridFSDownloadStream(null, getFileByName(null, filename, options));
+ Timeout operationTimeout = startTimeout();
+ GridFSFile file = getFileByName(null, filename, options, operationTimeout);
+ return createGridFSDownloadStream(null, file, operationTimeout);
}
@Override
@@ -278,7 +309,9 @@ public GridFSDownloadStream openDownloadStream(final ClientSession clientSession
@Override
public GridFSDownloadStream openDownloadStream(final ClientSession clientSession, final BsonValue id) {
notNull("clientSession", clientSession);
- return createGridFSDownloadStream(clientSession, getFileInfoById(clientSession, id));
+ Timeout operationTimeout = startTimeout();
+ GridFSFile fileInfoById = getFileInfoById(clientSession, id, operationTimeout);
+ return createGridFSDownloadStream(clientSession, fileInfoById, operationTimeout);
}
@Override
@@ -290,11 +323,14 @@ public GridFSDownloadStream openDownloadStream(final ClientSession clientSession
public GridFSDownloadStream openDownloadStream(final ClientSession clientSession, final String filename,
final GridFSDownloadOptions options) {
notNull("clientSession", clientSession);
- return createGridFSDownloadStream(clientSession, getFileByName(clientSession, filename, options));
+ Timeout operationTimeout = startTimeout();
+ GridFSFile file = getFileByName(clientSession, filename, options, operationTimeout);
+ return createGridFSDownloadStream(clientSession, file, operationTimeout);
}
- private GridFSDownloadStream createGridFSDownloadStream(@Nullable final ClientSession clientSession, final GridFSFile gridFSFile) {
- return new GridFSDownloadStreamImpl(clientSession, gridFSFile, chunksCollection);
+ private GridFSDownloadStream createGridFSDownloadStream(@Nullable final ClientSession clientSession, final GridFSFile gridFSFile,
+ @Nullable final Timeout operationTimeout) {
+ return new GridFSDownloadStreamImpl(clientSession, gridFSFile, chunksCollection, operationTimeout);
}
@Override
@@ -365,7 +401,12 @@ public GridFSFindIterable find(final ClientSession clientSession, final Bson fil
}
private GridFSFindIterable createGridFSFindIterable(@Nullable final ClientSession clientSession, @Nullable final Bson filter) {
- return new GridFSFindIterableImpl(createFindIterable(clientSession, filter));
+ return new GridFSFindIterableImpl(createFindIterable(clientSession, filter, startTimeout()));
+ }
+
+ private GridFSFindIterable createGridFSFindIterable(@Nullable final ClientSession clientSession, @Nullable final Bson filter,
+ @Nullable final Timeout operationTimeout) {
+ return new GridFSFindIterableImpl(createFindIterable(clientSession, filter, operationTimeout));
}
@Override
@@ -390,13 +431,18 @@ public void delete(final ClientSession clientSession, final BsonValue id) {
}
private void executeDelete(@Nullable final ClientSession clientSession, final BsonValue id) {
+ Timeout operationTimeout = startTimeout();
DeleteResult result;
if (clientSession != null) {
- result = filesCollection.deleteOne(clientSession, new BsonDocument("_id", id));
- chunksCollection.deleteMany(clientSession, new BsonDocument("files_id", id));
+ result = withNullableTimeout(filesCollection, operationTimeout)
+ .deleteOne(clientSession, new BsonDocument("_id", id));
+ withNullableTimeout(chunksCollection, operationTimeout)
+ .deleteMany(clientSession, new BsonDocument("files_id", id));
} else {
- result = filesCollection.deleteOne(new BsonDocument("_id", id));
- chunksCollection.deleteMany(new BsonDocument("files_id", id));
+ result = withNullableTimeout(filesCollection, operationTimeout)
+ .deleteOne(new BsonDocument("_id", id));
+ withNullableTimeout(chunksCollection, operationTimeout)
+ .deleteMany(new BsonDocument("files_id", id));
}
if (result.wasAcknowledged() && result.getDeletedCount() == 0) {
@@ -426,12 +472,13 @@ public void rename(final ClientSession clientSession, final BsonValue id, final
}
private void executeRename(@Nullable final ClientSession clientSession, final BsonValue id, final String newFilename) {
+ Timeout operationTimeout = startTimeout();
UpdateResult updateResult;
if (clientSession != null) {
- updateResult = filesCollection.updateOne(clientSession, new BsonDocument("_id", id),
+ updateResult = withNullableTimeout(filesCollection, operationTimeout).updateOne(clientSession, new BsonDocument("_id", id),
new BsonDocument("$set", new BsonDocument("filename", new BsonString(newFilename))));
} else {
- updateResult = filesCollection.updateOne(new BsonDocument("_id", id),
+ updateResult = withNullableTimeout(filesCollection, operationTimeout).updateOne(new BsonDocument("_id", id),
new BsonDocument("$set", new BsonDocument("filename", new BsonString(newFilename))));
}
@@ -442,15 +489,17 @@ private void executeRename(@Nullable final ClientSession clientSession, final Bs
@Override
public void drop() {
- filesCollection.drop();
- chunksCollection.drop();
+ Timeout operationTimeout = startTimeout();
+ withNullableTimeout(filesCollection, operationTimeout).drop();
+ withNullableTimeout(chunksCollection, operationTimeout).drop();
}
@Override
public void drop(final ClientSession clientSession) {
+ Timeout operationTimeout = startTimeout();
notNull("clientSession", clientSession);
- filesCollection.drop(clientSession);
- chunksCollection.drop(clientSession);
+ withNullableTimeout(filesCollection, operationTimeout).drop(clientSession);
+ withNullableTimeout(chunksCollection, operationTimeout).drop(clientSession);
}
private static MongoCollection getFilesCollection(final MongoDatabase database, final String bucketName) {
@@ -463,37 +512,45 @@ private static MongoCollection getChunksCollection(final MongoDatabase
return database.getCollection(bucketName + ".chunks").withCodecRegistry(MongoClientSettings.getDefaultCodecRegistry());
}
- private void checkCreateIndex(@Nullable final ClientSession clientSession) {
+ private void checkCreateIndex(@Nullable final ClientSession clientSession, @Nullable final Timeout operationTimeout) {
if (!checkedIndexes) {
- if (collectionIsEmpty(clientSession, filesCollection.withDocumentClass(Document.class).withReadPreference(primary()))) {
+ if (collectionIsEmpty(clientSession,
+ filesCollection.withDocumentClass(Document.class).withReadPreference(primary()),
+ operationTimeout)) {
+
Document filesIndex = new Document("filename", 1).append("uploadDate", 1);
- if (!hasIndex(clientSession, filesCollection.withReadPreference(primary()), filesIndex)) {
- createIndex(clientSession, filesCollection, filesIndex, new IndexOptions());
+ if (!hasIndex(clientSession, filesCollection.withReadPreference(primary()), filesIndex, operationTimeout)) {
+ createIndex(clientSession, filesCollection, filesIndex, new IndexOptions(), operationTimeout);
}
Document chunksIndex = new Document("files_id", 1).append("n", 1);
- if (!hasIndex(clientSession, chunksCollection.withReadPreference(primary()), chunksIndex)) {
- createIndex(clientSession, chunksCollection, chunksIndex, new IndexOptions().unique(true));
+ if (!hasIndex(clientSession, chunksCollection.withReadPreference(primary()), chunksIndex, operationTimeout)) {
+ createIndex(clientSession, chunksCollection, chunksIndex, new IndexOptions().unique(true), operationTimeout);
}
}
checkedIndexes = true;
}
}
- private boolean collectionIsEmpty(@Nullable final ClientSession clientSession, final MongoCollection collection) {
+ private boolean collectionIsEmpty(@Nullable final ClientSession clientSession,
+ final MongoCollection collection,
+ @Nullable final Timeout operationTimeout) {
if (clientSession != null) {
- return collection.find(clientSession).projection(new Document("_id", 1)).first() == null;
+ return withNullableTimeout(collection, operationTimeout)
+ .find(clientSession).projection(new Document("_id", 1)).first() == null;
} else {
- return collection.find().projection(new Document("_id", 1)).first() == null;
+ return withNullableTimeout(collection, operationTimeout)
+ .find().projection(new Document("_id", 1)).first() == null;
}
}
- private boolean hasIndex(@Nullable final ClientSession clientSession, final MongoCollection collection, final Document index) {
+ private boolean hasIndex(@Nullable final ClientSession clientSession, final MongoCollection collection,
+ final Document index, @Nullable final Timeout operationTimeout) {
boolean hasIndex = false;
ListIndexesIterable listIndexesIterable;
if (clientSession != null) {
- listIndexesIterable = collection.listIndexes(clientSession);
+ listIndexesIterable = withNullableTimeout(collection, operationTimeout).listIndexes(clientSession);
} else {
- listIndexesIterable = collection.listIndexes();
+ listIndexesIterable = withNullableTimeout(collection, operationTimeout).listIndexes();
}
ArrayList indexes = listIndexesIterable.into(new ArrayList<>());
@@ -513,16 +570,16 @@ private boolean hasIndex(@Nullable final ClientSession clientSession, final
}
private void createIndex(@Nullable final ClientSession clientSession, final MongoCollection collection, final Document index,
- final IndexOptions indexOptions) {
+ final IndexOptions indexOptions, final @Nullable Timeout operationTimeout) {
if (clientSession != null) {
- collection.createIndex(clientSession, index, indexOptions);
+ withNullableTimeout(collection, operationTimeout).createIndex(clientSession, index, indexOptions);
} else {
- collection.createIndex(index, indexOptions);
+ withNullableTimeout(collection, operationTimeout).createIndex(index, indexOptions);
}
}
private GridFSFile getFileByName(@Nullable final ClientSession clientSession, final String filename,
- final GridFSDownloadOptions options) {
+ final GridFSDownloadOptions options, @Nullable final Timeout operationTimeout) {
int revision = options.getRevision();
int skip;
int sort;
@@ -534,7 +591,7 @@ private GridFSFile getFileByName(@Nullable final ClientSession clientSession, fi
sort = -1;
}
- GridFSFile fileInfo = createGridFSFindIterable(clientSession, new Document("filename", filename)).skip(skip)
+ GridFSFile fileInfo = createGridFSFindIterable(clientSession, new Document("filename", filename), operationTimeout).skip(skip)
.sort(new Document("uploadDate", sort)).first();
if (fileInfo == null) {
throw new MongoGridFSException(format("No file found with the filename: %s and revision: %s", filename, revision));
@@ -542,25 +599,30 @@ private GridFSFile getFileByName(@Nullable final ClientSession clientSession, fi
return fileInfo;
}
- private GridFSFile getFileInfoById(@Nullable final ClientSession clientSession, final BsonValue id) {
+ private GridFSFile getFileInfoById(@Nullable final ClientSession clientSession, final BsonValue id,
+ @Nullable final Timeout operationTImeout) {
notNull("id", id);
- GridFSFile fileInfo = createFindIterable(clientSession, new Document("_id", id)).first();
+ GridFSFile fileInfo = createFindIterable(clientSession, new Document("_id", id), operationTImeout).first();
if (fileInfo == null) {
throw new MongoGridFSException(format("No file found with the id: %s", id));
}
return fileInfo;
}
- private FindIterable createFindIterable(@Nullable final ClientSession clientSession, @Nullable final Bson filter) {
+ private FindIterable createFindIterable(@Nullable final ClientSession clientSession, @Nullable final Bson filter,
+ @Nullable final Timeout operationTImeout) {
FindIterable findIterable;
if (clientSession != null) {
- findIterable = filesCollection.find(clientSession);
+ findIterable = withNullableTimeout(filesCollection, operationTImeout).find(clientSession);
} else {
- findIterable = filesCollection.find();
+ findIterable = withNullableTimeout(filesCollection, operationTImeout).find();
}
if (filter != null) {
findIterable = findIterable.filter(filter);
}
+ if (filesCollection.getTimeout(MILLISECONDS) != null) {
+ findIterable.timeoutMode(TimeoutMode.CURSOR_LIFETIME);
+ }
return findIterable;
}
@@ -572,6 +634,8 @@ private void downloadToStream(final GridFSDownloadStream downloadStream, final O
while ((len = downloadStream.read(buffer)) != -1) {
destination.write(buffer, 0, len);
}
+ } catch (MongoOperationTimeoutException e){ // TODO (CSOT) - JAVA-5248 Update to MongoOperationTimeoutException
+ throw e;
} catch (IOException e) {
savedThrowable = new MongoGridFSException("IOException when reading from the OutputStream", e);
} catch (Exception e) {
@@ -587,4 +651,14 @@ private void downloadToStream(final GridFSDownloadStream downloadStream, final O
}
}
}
+
+ private static MongoCollection withNullableTimeout(final MongoCollection chunksCollection,
+ @Nullable final Timeout timeout) {
+ return CollectionTimeoutHelper.collectionWithTimeout(chunksCollection, TIMEOUT_MESSAGE, timeout);
+ }
+
+ @Nullable
+ private Timeout startTimeout() {
+ return TimeoutContext.calculateTimeout(filesCollection.getTimeout(MILLISECONDS));
+ }
}
diff --git a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSDownloadStreamImpl.java b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSDownloadStreamImpl.java
index 16f0bcd7fd3..e744607a0ad 100644
--- a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSDownloadStreamImpl.java
+++ b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSDownloadStreamImpl.java
@@ -16,12 +16,15 @@
package com.mongodb.client.gridfs;
+import com.mongodb.MongoOperationTimeoutException;
import com.mongodb.MongoGridFSException;
import com.mongodb.client.ClientSession;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
+import com.mongodb.client.cursor.TimeoutMode;
import com.mongodb.client.gridfs.model.GridFSFile;
+import com.mongodb.internal.time.Timeout;
import com.mongodb.lang.Nullable;
import org.bson.BsonValue;
import org.bson.Document;
@@ -35,10 +38,14 @@
import static java.lang.String.format;
class GridFSDownloadStreamImpl extends GridFSDownloadStream {
+ private static final String TIMEOUT_MESSAGE = "The GridFS download stream has timed out";
private final ClientSession clientSession;
private final GridFSFile fileInfo;
private final MongoCollection chunksCollection;
private final BsonValue fileId;
+ /**
+ * The length, in bytes of the file to download.
+ */
private final long length;
private final int chunkSizeInBytes;
private final int numberOfChunks;
@@ -46,16 +53,20 @@ class GridFSDownloadStreamImpl extends GridFSDownloadStream {
private int batchSize;
private int chunkIndex;
private int bufferOffset;
+ /**
+ * Current byte position in the file.
+ */
private long currentPosition;
private byte[] buffer = null;
private long markPosition;
-
+ @Nullable
+ private final Timeout timeout;
private final ReentrantLock closeLock = new ReentrantLock();
private final ReentrantLock cursorLock = new ReentrantLock();
private boolean closed = false;
GridFSDownloadStreamImpl(@Nullable final ClientSession clientSession, final GridFSFile fileInfo,
- final MongoCollection chunksCollection) {
+ final MongoCollection chunksCollection, @Nullable final Timeout timeout) {
this.clientSession = clientSession;
this.fileInfo = notNull("file information", fileInfo);
this.chunksCollection = notNull("chunks collection", chunksCollection);
@@ -64,6 +75,7 @@ class GridFSDownloadStreamImpl extends GridFSDownloadStream {
length = fileInfo.getLength();
chunkSizeInBytes = fileInfo.getChunkSize();
numberOfChunks = (int) Math.ceil((double) length / chunkSizeInBytes);
+ this.timeout = timeout;
}
@Override
@@ -97,6 +109,7 @@ public int read(final byte[] b) {
@Override
public int read(final byte[] b, final int off, final int len) {
checkClosed();
+ checkTimeout();
if (currentPosition == length) {
return -1;
@@ -118,6 +131,7 @@ public int read(final byte[] b, final int off, final int len) {
@Override
public long skip(final long bytesToSkip) {
checkClosed();
+ checkTimeout();
if (bytesToSkip <= 0) {
return 0;
}
@@ -146,6 +160,7 @@ public long skip(final long bytesToSkip) {
@Override
public int available() {
checkClosed();
+ checkTimeout();
if (buffer == null) {
return 0;
} else {
@@ -166,6 +181,7 @@ public void mark(final int readlimit) {
@Override
public void reset() {
checkClosed();
+ checkTimeout();
if (currentPosition == markPosition) {
return;
}
@@ -195,6 +211,12 @@ public void close() {
});
}
+ private void checkTimeout() {
+ if (timeout != null && timeout.hasExpired()) {
+ // TODO (CSOT) - JAVA-5248 Update to MongoOperationTimeoutException
+ throw new MongoOperationTimeoutException("The GridFS download stream has timed out");
+ }
+ }
private void checkClosed() {
withInterruptibleLock(closeLock, () -> {
if (closed) {
@@ -236,11 +258,15 @@ private MongoCursor getCursor(final int startChunkIndex) {
FindIterable findIterable;
Document filter = new Document("files_id", fileId).append("n", new Document("$gte", startChunkIndex));
if (clientSession != null) {
- findIterable = chunksCollection.find(clientSession, filter);
+ findIterable = withNullableTimeout(chunksCollection, timeout).find(clientSession, filter);
} else {
- findIterable = chunksCollection.find(filter);
+ findIterable = withNullableTimeout(chunksCollection, timeout).find(filter);
}
- return findIterable.batchSize(batchSize).sort(new Document("n", 1)).iterator();
+ if (timeout != null){
+ findIterable.timeoutMode(TimeoutMode.CURSOR_LIFETIME);
+ }
+ return findIterable.batchSize(batchSize)
+ .sort(new Document("n", 1)).iterator();
}
private byte[] getBufferFromChunk(@Nullable final Document chunk, final int expectedChunkIndex) {
@@ -279,4 +305,9 @@ private byte[] getBufferFromChunk(@Nullable final Document chunk, final int expe
private byte[] getBuffer(final int chunkIndexToFetch) {
return getBufferFromChunk(getChunk(chunkIndexToFetch), chunkIndexToFetch);
}
+
+ private MongoCollection withNullableTimeout(final MongoCollection chunksCollection,
+ @Nullable final Timeout timeout) {
+ return CollectionTimeoutHelper.collectionWithTimeout(chunksCollection, TIMEOUT_MESSAGE, timeout);
+ }
}
diff --git a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSFindIterable.java b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSFindIterable.java
index ab3c1540896..9b8cb8b9117 100644
--- a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSFindIterable.java
+++ b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSFindIterable.java
@@ -16,10 +16,7 @@
package com.mongodb.client.gridfs;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoIterable;
-import com.mongodb.client.cursor.TimeoutMode;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.client.model.Collation;
import com.mongodb.lang.Nullable;
@@ -118,25 +115,4 @@ public interface GridFSFindIterable extends MongoIterable {
* @mongodb.server.release 3.4
*/
GridFSFindIterable collation(@Nullable Collation collation);
-
- /**
- * Sets the timeoutMode for the cursor.
- *
- *
- * Requires the {@code timeout} to be set, either in the {@link com.mongodb.MongoClientSettings},
- * via {@link MongoDatabase} or via {@link MongoCollection}
- *
- *
- * If the {@code timeout} is set then:
- *
- * - For non-tailable cursors, the default value of timeoutMode is {@link TimeoutMode#CURSOR_LIFETIME}
- * - For tailable cursors, the default value of timeoutMode is {@link TimeoutMode#ITERATION} and its an error
- * to configure it as: {@link TimeoutMode#CURSOR_LIFETIME}
- *
- *
- * @param timeoutMode the timeout mode
- * @return this
- * @since CSOT
- */
- GridFSFindIterable timeoutMode(TimeoutMode timeoutMode);
}
diff --git a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSFindIterableImpl.java b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSFindIterableImpl.java
index bf10f3b6ee2..95737b70e35 100644
--- a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSFindIterableImpl.java
+++ b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSFindIterableImpl.java
@@ -20,7 +20,6 @@
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoIterable;
-import com.mongodb.client.cursor.TimeoutMode;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.client.model.Collation;
import com.mongodb.lang.Nullable;
@@ -74,12 +73,6 @@ public GridFSFindIterable batchSize(final int batchSize) {
return this;
}
- @Override
- public GridFSFindIterable timeoutMode(final TimeoutMode timeoutMode) {
- underlying.timeoutMode(timeoutMode);
- return this;
- }
-
@Override
public GridFSFindIterable collation(@Nullable final Collation collation) {
underlying.collation(collation);
diff --git a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSUploadStreamImpl.java b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSUploadStreamImpl.java
index ff359e34781..e722f3a4c0a 100644
--- a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSUploadStreamImpl.java
+++ b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSUploadStreamImpl.java
@@ -20,6 +20,8 @@
import com.mongodb.client.ClientSession;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.gridfs.model.GridFSFile;
+import com.mongodb.internal.TimeoutContext;
+import com.mongodb.internal.time.Timeout;
import com.mongodb.lang.Nullable;
import org.bson.BsonValue;
import org.bson.Document;
@@ -33,6 +35,7 @@
import static com.mongodb.internal.Locks.withInterruptibleLock;
final class GridFSUploadStreamImpl extends GridFSUploadStream {
+ public static final String TIMEOUT_MESSAGE = "The GridFS upload stream has timed out";
private final ClientSession clientSession;
private final MongoCollection filesCollection;
private final MongoCollection chunksCollection;
@@ -44,13 +47,14 @@ final class GridFSUploadStreamImpl extends GridFSUploadStream {
private long lengthInBytes;
private int bufferOffset;
private int chunkIndex;
-
+ @Nullable
+ private final Timeout timeout;
private final ReentrantLock closeLock = new ReentrantLock();
private boolean closed = false;
GridFSUploadStreamImpl(@Nullable final ClientSession clientSession, final MongoCollection filesCollection,
final MongoCollection chunksCollection, final BsonValue fileId, final String filename,
- final int chunkSizeBytes, @Nullable final Document metadata) {
+ final int chunkSizeBytes, @Nullable final Document metadata, @Nullable final Timeout timeout) {
this.clientSession = clientSession;
this.filesCollection = notNull("files collection", filesCollection);
this.chunksCollection = notNull("chunks collection", chunksCollection);
@@ -61,6 +65,7 @@ final class GridFSUploadStreamImpl extends GridFSUploadStream {
chunkIndex = 0;
bufferOffset = 0;
buffer = new byte[chunkSizeBytes];
+ this.timeout = timeout;
}
@Override
@@ -84,9 +89,11 @@ public void abort() {
});
if (clientSession != null) {
- chunksCollection.deleteMany(clientSession, new Document("files_id", fileId));
+ withNullableTimeout(chunksCollection, timeout)
+ .deleteMany(clientSession, new Document("files_id", fileId));
} else {
- chunksCollection.deleteMany(new Document("files_id", fileId));
+ withNullableTimeout(chunksCollection, timeout)
+ .deleteMany(new Document("files_id", fileId));
}
}
@@ -105,6 +112,7 @@ public void write(final byte[] b) {
@Override
public void write(final byte[] b, final int off, final int len) {
checkClosed();
+ checkTimeout();
notNull("b", b);
if ((off < 0) || (off > b.length) || (len < 0)
@@ -136,6 +144,12 @@ public void write(final byte[] b, final int off, final int len) {
}
}
+ private void checkTimeout() {
+ if (timeout != null && timeout.hasExpired()) {
+ throw TimeoutContext.createMongoTimeoutException(TIMEOUT_MESSAGE);
+ }
+ }
+
@Override
public void close() {
boolean alreadyClosed = withInterruptibleLock(closeLock, () -> {
@@ -150,9 +164,9 @@ public void close() {
GridFSFile gridFSFile = new GridFSFile(fileId, filename, lengthInBytes, chunkSizeBytes, new Date(),
metadata);
if (clientSession != null) {
- filesCollection.insertOne(clientSession, gridFSFile);
+ withNullableTimeout(filesCollection, timeout).insertOne(clientSession, gridFSFile);
} else {
- filesCollection.insertOne(gridFSFile);
+ withNullableTimeout(filesCollection, timeout).insertOne(gridFSFile);
}
buffer = null;
}
@@ -160,10 +174,12 @@ public void close() {
private void writeChunk() {
if (bufferOffset > 0) {
if (clientSession != null) {
- chunksCollection.insertOne(clientSession, new Document("files_id", fileId).append("n", chunkIndex)
- .append("data", getData()));
+ withNullableTimeout(chunksCollection, timeout)
+ .insertOne(clientSession, new Document("files_id", fileId).append("n", chunkIndex).append("data", getData()));
} else {
- chunksCollection.insertOne(new Document("files_id", fileId).append("n", chunkIndex).append("data", getData()));
+ withNullableTimeout(chunksCollection, timeout)
+ .insertOne(new Document("files_id", fileId)
+ .append("n", chunkIndex).append("data", getData()));
}
chunkIndex++;
bufferOffset = 0;
@@ -186,4 +202,9 @@ private void checkClosed() {
}
});
}
+
+ private static MongoCollection withNullableTimeout(final MongoCollection collection,
+ @Nullable final Timeout timeout) {
+ return CollectionTimeoutHelper.collectionWithTimeout(collection, TIMEOUT_MESSAGE, timeout);
+ }
}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideEncryptionDeadlockTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideEncryptionDeadlockTest.java
index ef965f0ae95..2ac985f21a6 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideEncryptionDeadlockTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideEncryptionDeadlockTest.java
@@ -195,11 +195,11 @@ public void shouldPassAllOutcomes(final int maxPoolSize,
}
private void assertEventEquality(final TestCommandListener commandListener, final List expectedStartEvents) {
- List actualStartedEvents = commandListener.getCommandStartedEvents();
+ List actualStartedEvents = commandListener.getCommandStartedEvents();
assertEquals(expectedStartEvents.size(), actualStartedEvents.size());
for (int i = 0; i < expectedStartEvents.size(); i++) {
ExpectedEvent expectedEvent = expectedStartEvents.get(i);
- CommandStartedEvent actualEvent = (CommandStartedEvent) actualStartedEvents.get(i);
+ CommandStartedEvent actualEvent = actualStartedEvents.get(i);
assertEquals(expectedEvent.getDatabase(), actualEvent.getDatabaseName(), "Database name");
assertEquals(expectedEvent.getCommandName(), actualEvent.getCommandName(), "Command name");
}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideEncryptionTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideEncryptionTest.java
index 9c14640cb4b..8d5fccaef98 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideEncryptionTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideEncryptionTest.java
@@ -26,6 +26,7 @@
import com.mongodb.client.model.ValidationOptions;
import com.mongodb.client.test.CollectionHelper;
import com.mongodb.event.CommandEvent;
+import com.mongodb.event.CommandStartedEvent;
import com.mongodb.internal.connection.TestCommandListener;
import com.mongodb.lang.Nullable;
import org.bson.BsonArray;
@@ -325,7 +326,7 @@ public void shouldPassAllOutcomes() {
if (definition.containsKey("expectations")) {
List expectedEvents = getExpectedEvents(definition.getArray("expectations"), "default", null);
- List events = commandListener.getCommandStartedEvents();
+ List events = commandListener.getCommandStartedEvents();
assertEventsEquality(expectedEvents, events);
}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java
index 1fee5d1d70b..392cd1e81bf 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java
@@ -16,6 +16,7 @@
package com.mongodb.client;
+import com.mongodb.ClusterFixture;
import com.mongodb.ConnectionString;
import com.mongodb.CursorType;
import com.mongodb.MongoClientSettings;
@@ -25,11 +26,15 @@
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
+import com.mongodb.client.gridfs.GridFSBucket;
+import com.mongodb.client.gridfs.GridFSDownloadStream;
+import com.mongodb.client.gridfs.GridFSUploadStream;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.test.CollectionHelper;
-import com.mongodb.event.CommandEvent;
+import com.mongodb.event.CommandStartedEvent;
+import com.mongodb.event.CommandSucceededEvent;
import com.mongodb.internal.connection.ServerHelper;
import com.mongodb.internal.connection.TestCommandListener;
import com.mongodb.test.FlakyTest;
@@ -38,12 +43,15 @@
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.codecs.BsonDocumentCodec;
+import org.bson.types.ObjectId;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Named;
import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -53,6 +61,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet;
@@ -61,6 +70,7 @@
import static com.mongodb.ClusterFixture.sleep;
import static com.mongodb.client.Fixture.getDefaultDatabaseName;
import static com.mongodb.client.Fixture.getPrimary;
+import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -76,12 +86,127 @@
*/
public abstract class AbstractClientSideOperationsTimeoutProseTest {
+ private static final String GRID_FS_BUCKET_NAME = "db.fs";
+ private static final String GRID_FS_COLLECTION_NAME_CHUNKS = GRID_FS_BUCKET_NAME + ".chunks";
+ private static final String GRID_FS_COLLECTION_NAME_FILE = GRID_FS_BUCKET_NAME + ".files";
+ private final MongoNamespace namespace = new MongoNamespace(getDefaultDatabaseName(), this.getClass().getSimpleName());
private static final AtomicInteger COUNTER = new AtomicInteger();
private TestCommandListener commandListener;
@Nullable
private CollectionHelper collectionHelper;
-
+ private CollectionHelper filesCollectionHelper;
+ private CollectionHelper chunksCollectionHelper;
protected abstract MongoClient createMongoClient(MongoClientSettings mongoClientSettings);
+ protected abstract GridFSBucket createGridFsBucket(MongoDatabase mongoDatabase, String bucketName);
+
+ @Tag("setsFailPoint")
+ @DisplayName("6. GridFS Upload - uploads via openUploadStream can be timed out")
+ @Disabled("TODO (CSOT) - JAVA-4057")
+ @Test
+ public void testGridFSUploadViaOpenUploadStreamTimeout() {
+ assumeTrue(serverVersionAtLeast(4, 4));
+ long rtt = ClusterFixture.getPrimaryRTT();
+
+ collectionHelper.runAdminCommand("{"
+ + " configureFailPoint: \"failCommand\","
+ + " mode: { times: 1 },"
+ + " data: {"
+ + " failCommands: [\"insert\"],"
+ + " blockConnection: true,"
+ + " blockTimeMS: " + (rtt + 100)
+ + " }"
+ + "}");
+ try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder()
+ .timeout(rtt + 95, TimeUnit.MILLISECONDS))) {
+ MongoDatabase database = client.getDatabase(namespace.getDatabaseName());
+ GridFSBucket gridFsBucket = createGridFsBucket(database, GRID_FS_BUCKET_NAME);
+
+ try (GridFSUploadStream uploadStream = gridFsBucket.openUploadStream("filename")){
+ uploadStream.write(0x12);
+ assertThrows(MongoOperationTimeoutException.class, uploadStream::close);
+ }
+ }
+ }
+
+ @Tag("setsFailPoint")
+ @Disabled("TODO (CSOT) - JAVA-4057")
+ @DisplayName("6. GridFS Upload - Aborting an upload stream can be timed out")
+ @Test
+ public void testAbortingGridFsUploadStreamTimeout() {
+ assumeTrue(serverVersionAtLeast(4, 4));
+ long rtt = ClusterFixture.getPrimaryRTT();
+
+ collectionHelper.runAdminCommand("{"
+ + " configureFailPoint: \"failCommand\","
+ + " mode: { times: 1 },"
+ + " data: {"
+ + " failCommands: [\"delete\"],"
+ + " blockConnection: true,"
+ + " blockTimeMS: " + (rtt + 100)
+ + " }"
+ + "}");
+ try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder()
+ .timeout(rtt + 95, TimeUnit.MILLISECONDS))) {
+ MongoDatabase database = client.getDatabase(namespace.getDatabaseName());
+ GridFSBucket gridFsBucket = createGridFsBucket(database, GRID_FS_BUCKET_NAME).withChunkSizeBytes(2);
+
+ try (GridFSUploadStream uploadStream = gridFsBucket.openUploadStream("filename")){
+ uploadStream.write(new byte[]{0x01, 0x02, 0x03, 0x04});
+ assertThrows(MongoOperationTimeoutException.class, uploadStream::abort);
+ }
+ }
+ }
+
+ @Tag("setsFailPoint")
+ @DisplayName("6. GridFS Download")
+ @Test
+ public void testGridFsDownloadStreamTimeout() {
+ assumeTrue(serverVersionAtLeast(4, 4));
+ long rtt = ClusterFixture.getPrimaryRTT();
+
+ try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder()
+ .timeout(rtt + 100, TimeUnit.MILLISECONDS))) {
+ MongoDatabase database = client.getDatabase(namespace.getDatabaseName());
+ GridFSBucket gridFsBucket = createGridFsBucket(database, GRID_FS_BUCKET_NAME).withChunkSizeBytes(2);
+ filesCollectionHelper.insertDocuments(asList(BsonDocument.parse(
+ "{"
+ + " _id: {"
+ + " $oid: \"000000000000000000000005\""
+ + " },"
+ + " length: 10,"
+ + " chunkSize: 4,"
+ + " uploadDate: {"
+ + " $date: \"1970-01-01T00:00:00.000Z\""
+ + " },"
+ + " md5: \"57d83cd477bfb1ccd975ab33d827a92b\","
+ + " filename: \"length-10\","
+ + " contentType: \"application/octet-stream\","
+ + " aliases: [],"
+ + " metadata: {}"
+ + "}"
+ )), WriteConcern.MAJORITY);
+
+ try (GridFSDownloadStream downloadStream = gridFsBucket.openDownloadStream(new ObjectId("000000000000000000000005"))){
+ collectionHelper.runAdminCommand("{"
+ + " configureFailPoint: \"failCommand\","
+ + " mode: { times: 1 },"
+ + " data: {"
+ + " failCommands: [\"find\"],"
+ + " blockConnection: true,"
+ + " blockTimeMS: " + (rtt + 95)
+ + " }"
+ + "}");
+ assertThrows(MongoOperationTimeoutException.class, downloadStream::read);
+
+ List events = commandListener.getCommandStartedEvents();
+ List findCommands = events.stream().filter(e -> e.getCommandName().equals("find")).collect(Collectors.toList());
+
+ assertEquals(2, findCommands.size());
+ assertEquals(GRID_FS_COLLECTION_NAME_FILE, findCommands.get(0).getCommand().getString("find").getValue());
+ assertEquals(GRID_FS_COLLECTION_NAME_CHUNKS, findCommands.get(1).getCommand().getString("find").getValue());
+ }
+ }
+ }
protected abstract boolean isAsync();
@@ -118,7 +243,7 @@ public void testBlockingIterationMethodsTailableCursor() {
assertThrows(MongoOperationTimeoutException.class, cursor::next);
}
- List events = commandListener.getCommandSucceededEvents();
+ List events = commandListener.getCommandSucceededEvents();
assertEquals(1, events.stream().filter(e -> e.getCommandName().equals("find")).count());
long getMoreCount = events.stream().filter(e -> e.getCommandName().equals("getMore")).count();
assertTrue(getMoreCount <= 2, "getMoreCount expected to less than or equal to two but was: " + getMoreCount);
@@ -168,7 +293,7 @@ public void testBlockingIterationMethodsChangeStream() {
assertEquals(1, fullDocument.get("x"));
assertThrows(MongoOperationTimeoutException.class, cursor::next);
}
- List events = commandListener.getCommandSucceededEvents();
+ List events = commandListener.getCommandSucceededEvents();
assertEquals(1, events.stream().filter(e -> e.getCommandName().equals("aggregate")).count());
long getMoreCount = events.stream().filter(e -> e.getCommandName().equals("getMore")).count();
assertTrue(getMoreCount <= 2, "getMoreCount expected to less than or equal to two but was: " + getMoreCount);
@@ -228,18 +353,23 @@ private MongoClientSettings.Builder getMongoClientSettingsBuilder() {
@BeforeEach
public void setUp() {
+ collectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), namespace);
+ filesCollectionHelper = new CollectionHelper<>(new BsonDocumentCodec(),
+ new MongoNamespace(getDefaultDatabaseName(), GRID_FS_COLLECTION_NAME_FILE));
+ chunksCollectionHelper = new CollectionHelper<>(new BsonDocumentCodec(),
+ new MongoNamespace(getDefaultDatabaseName(), GRID_FS_COLLECTION_NAME_CHUNKS));
commandListener = new TestCommandListener();
}
@AfterEach
public void tearDown(final TestInfo info) {
- if (collectionHelper != null) {
- if (info.getTags().contains("usesFailPoint")) {
- collectionHelper.runAdminCommand("{configureFailPoint: \"failCommand\", mode: \"off\"}");
- }
- CollectionHelper.dropDatabase(getDefaultDatabaseName());
+ if (info.getTags().contains("setsFailPoint") && serverVersionAtLeast(4, 4)) {
+ collectionHelper.runAdminCommand("{configureFailPoint: \"failCommand\", mode: \"off\"}");
}
+ collectionHelper.drop();
+ filesCollectionHelper.drop();
+ chunksCollectionHelper.drop();
try {
ServerHelper.checkPool(getPrimary());
} catch (InterruptedException e) {
diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractCrudTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractCrudTest.java
index fe78841bec4..634f9946ea3 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/AbstractCrudTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractCrudTest.java
@@ -22,6 +22,7 @@
import com.mongodb.client.test.CollectionHelper;
import com.mongodb.event.CommandEvent;
import com.mongodb.event.CommandListener;
+import com.mongodb.event.CommandStartedEvent;
import com.mongodb.internal.connection.TestCommandListener;
import org.bson.BsonArray;
import org.bson.BsonBoolean;
@@ -152,7 +153,7 @@ public void shouldPassAllOutcomes() {
}
if (definition.containsKey("expectations")) {
List expectedEvents = getExpectedEvents(definition.getArray("expectations"), databaseName, null);
- List events = commandListener.getCommandStartedEvents();
+ List events = commandListener.getCommandStartedEvents();
assertEventsEquality(expectedEvents, events.subList(0, expectedEvents.size()));
diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractRetryableReadsTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractRetryableReadsTest.java
index 1df7174e246..34620130a40 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/AbstractRetryableReadsTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractRetryableReadsTest.java
@@ -28,6 +28,7 @@
import com.mongodb.client.gridfs.GridFSBuckets;
import com.mongodb.client.test.CollectionHelper;
import com.mongodb.event.CommandEvent;
+import com.mongodb.event.CommandStartedEvent;
import com.mongodb.internal.connection.TestCommandListener;
import org.bson.BsonArray;
import org.bson.BsonBinary;
@@ -235,7 +236,7 @@ public void shouldPassAllOutcomes() {
if (definition.containsKey("expectations")) {
List expectedEvents = getExpectedEvents(definition.getArray("expectations"), databaseName, null);
- List events = commandListener.waitForStartedEvents(expectedEvents.size());
+ List events = commandListener.waitForStartedEvents(expectedEvents.size());
assertEventsEquality(expectedEvents, events);
}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractServerSelectionProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractServerSelectionProseTest.java
index 894d291a743..0aa2ff28536 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/AbstractServerSelectionProseTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractServerSelectionProseTest.java
@@ -18,7 +18,7 @@
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.ServerAddress;
-import com.mongodb.event.CommandEvent;
+import com.mongodb.event.CommandStartedEvent;
import com.mongodb.internal.connection.TestCommandListener;
import org.bson.BsonArray;
import org.bson.BsonBoolean;
@@ -133,7 +133,7 @@ private static Map doSelections(final MongoCollection result : results) {
result.get();
}
- List commandStartedEvents = commandListener.getCommandStartedEvents();
+ List commandStartedEvents = commandListener.getCommandStartedEvents();
assertEquals(tasks * opsPerTask, commandStartedEvents.size());
return commandStartedEvents.stream()
.collect(groupingBy(event -> event.getConnectionDescription().getServerAddress()))
diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractUnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractUnifiedTest.java
index cbfab4725bc..ac5aacd9ca2 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/AbstractUnifiedTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractUnifiedTest.java
@@ -393,7 +393,7 @@ public void shouldPassAllOutcomes() {
if (definition.containsKey("expectations")) {
List expectedEvents = getExpectedEvents(definition.getArray("expectations"), databaseName, null);
- List events = commandListener.getCommandStartedEvents();
+ List events = commandListener.getCommandStartedEvents();
assertTrue("Actual number of events is less than expected number of events", events.size() >= expectedEvents.size());
assertEventsEquality(expectedEvents, events.subList(0, expectedEvents.size()), lsidMap);
@@ -570,7 +570,7 @@ private void executeOperations(final BsonArray operations, final boolean throwEx
} else if (operationName.equals("assertDifferentLsidOnLastTwoCommands")) {
List events = lastTwoCommandEvents();
String eventsJson = commandListener.getCommandStartedEvents().stream()
- .map(e -> ((CommandStartedEvent) e).getCommand().toJson())
+ .map(e -> e.getCommand().toJson())
.collect(Collectors.joining(", "));
assertNotEquals(eventsJson, ((CommandStartedEvent) events.get(0)).getCommand().getDocument("lsid"),
@@ -578,7 +578,7 @@ private void executeOperations(final BsonArray operations, final boolean throwEx
} else if (operationName.equals("assertSameLsidOnLastTwoCommands")) {
List events = lastTwoCommandEvents();
String eventsJson = commandListener.getCommandStartedEvents().stream()
- .map(e -> ((CommandStartedEvent) e).getCommand().toJson())
+ .map(e -> e.getCommand().toJson())
.collect(Collectors.joining(", "));
assertEquals(eventsJson, ((CommandStartedEvent) events.get(0)).getCommand().getDocument("lsid"),
((CommandStartedEvent) events.get(1)).getCommand().getDocument("lsid"));
@@ -723,9 +723,9 @@ private boolean assertExceptionState(final Exception e, final BsonValue expected
}
private List lastTwoCommandEvents() {
- List events = commandListener.getCommandStartedEvents();
+ List events = commandListener.getCommandStartedEvents();
assertTrue(events.size() >= 2);
- return events.subList(events.size() - 2, events.size());
+ return new ArrayList<>(events.subList(events.size() - 2, events.size()));
}
private TransactionOptions createTransactionOptions(final BsonDocument options) {
diff --git a/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutProseTest.java
index 6c8e6ec6c74..fc80e2f1139 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutProseTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutProseTest.java
@@ -17,6 +17,8 @@
package com.mongodb.client;
import com.mongodb.MongoClientSettings;
+import com.mongodb.client.gridfs.GridFSBucket;
+import com.mongodb.client.gridfs.GridFSBuckets;
/**
@@ -29,6 +31,11 @@ protected MongoClient createMongoClient(final MongoClientSettings mongoClientSet
return MongoClients.create(mongoClientSettings);
}
+ @Override
+ protected GridFSBucket createGridFsBucket(final MongoDatabase mongoDatabase, final String bucketName) {
+ return GridFSBuckets.create(mongoDatabase, bucketName);
+ }
+
@Override
protected boolean isAsync() {
return false;
diff --git a/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java b/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java
index 3adaec1916b..22812b30d18 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java
@@ -86,13 +86,25 @@ public static void checkSkipCSOTTest(final String fileDescription, final String
assumeFalse("TODO (CSOT) - JAVA-4052", fileDescription.startsWith("legacy timeouts behave correctly for retryable operations"));
assumeFalse("TODO (CSOT) - JAVA-4063", testDescription.contains("RTT"));
- assumeFalse("TODO (CSOT) - JAVA-4059", fileDescription.contains("GridFS"));
assumeFalse("TODO (CSOT) - JAVA-5248",
fileDescription.equals("MaxTimeMSExpired server errors are transformed into a custom timeout error"));
assumeFalse("TODO (CSOT) - JAVA-4062", testDescription.contains("wTimeoutMS is ignored"));
+
+ if (fileDescription.contains("GridFS")) { //TODO (CSOT) - JAVA-4057
+ assumeFalse("TODO (CSOT) - JAVA-4057", testDescription.contains("chunk insertion"));
+ assumeFalse("TODO (CSOT) - JAVA-4057", testDescription.contains("creation of files document"));
+ assumeFalse("TODO (CSOT) - JAVA-4057", testDescription.contains("delete against the files collection"));
+ assumeFalse("TODO (CSOT) - JAVA-4057", testDescription.contains("delete against the chunks collection"));
+ assumeFalse("TODO (CSOT) - JAVA-4057", testDescription.contains("overridden for a rename"));
+ assumeFalse("TODO (CSOT) - JAVA-4057", testDescription.contains("update during a rename"));
+ assumeFalse("TODO (CSOT) - JAVA-4057", testDescription.contains("collection drop"));
+ assumeFalse("TODO (CSOT) - JAVA-4057", testDescription.contains("drop as a whole"));
+ assumeFalse("TODO (CSOT) - JAVA-4057", testDescription.contains("entire delete"));
+ }
+
assumeFalse("TODO (CSOT) - JAVA-4057", testDescription.equals("maxTimeMS value in the command is less than timeoutMS"));
assumeFalse("TODO (CSOT) - JAVA-4057", fileDescription.contains("bulkWrite") || testDescription.contains("bulkWrite"));
diff --git a/driver-sync/src/test/functional/com/mongodb/client/ReadConcernTest.java b/driver-sync/src/test/functional/com/mongodb/client/ReadConcernTest.java
index 6521fa67010..a40b258ea6c 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/ReadConcernTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/ReadConcernTest.java
@@ -17,7 +17,6 @@
package com.mongodb.client;
import com.mongodb.ReadConcern;
-import com.mongodb.event.CommandEvent;
import com.mongodb.event.CommandStartedEvent;
import com.mongodb.internal.connection.TestCommandListener;
import org.bson.BsonDocument;
@@ -64,7 +63,7 @@ public void shouldIncludeReadConcernInCommand() {
mongoClient.getDatabase(getDefaultDatabaseName()).getCollection("test")
.withReadConcern(ReadConcern.LOCAL).find().into(new ArrayList<>());
- List events = commandListener.getCommandStartedEvents();
+ List events = commandListener.getCommandStartedEvents();
BsonDocument commandDocument = new BsonDocument("find", new BsonString("test"))
.append("readConcern", ReadConcern.LOCAL.asDocument())
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java
index 11f12d4cee6..597a20cfe9e 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java
@@ -79,7 +79,6 @@
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.lang.NonNull;
-import com.mongodb.lang.Nullable;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonDocumentWriter;
@@ -109,7 +108,7 @@
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
-final class UnifiedCrudHelper {
+final class UnifiedCrudHelper extends UnifiedHelper {
private final Entities entities;
private final String testDescription;
private final AtomicInteger uniqueIdGenerator = new AtomicInteger();
@@ -1733,7 +1732,6 @@ private MongoCollection getMongoCollection(final BsonDocument oper
}
return collection;
}
-
private MongoDatabase getMongoDatabase(final BsonDocument operation) {
MongoDatabase database = entities.getDatabase(operation.getString("object").getValue());
if (operation.containsKey("arguments")) {
@@ -1747,16 +1745,6 @@ private MongoDatabase getMongoDatabase(final BsonDocument operation) {
return database;
}
- @Nullable
- private Long getAndRemoveTimeoutMS(final BsonDocument arguments) {
- Long timeoutMS = null;
- if (arguments.containsKey("timeoutMS")) {
- timeoutMS = arguments.getNumber("timeoutMS").longValue();
- arguments.remove("timeoutMS");
- }
- return timeoutMS;
- }
-
private static void setCursorType(final FindIterable iterable, final Map.Entry cur) {
switch (cur.getValue().asString().getValue()) {
case "tailable":
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedGridFSHelper.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedGridFSHelper.java
index 179d88a4a32..13e95a58463 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedGridFSHelper.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedGridFSHelper.java
@@ -17,7 +17,9 @@
package com.mongodb.client.unified;
import com.mongodb.client.gridfs.GridFSBucket;
+import com.mongodb.client.gridfs.GridFSFindIterable;
import com.mongodb.client.gridfs.model.GridFSDownloadOptions;
+import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.client.gridfs.model.GridFSUploadOptions;
import com.mongodb.internal.HexUtils;
import org.bson.BsonDocument;
@@ -32,19 +34,55 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static java.util.Objects.requireNonNull;
-final class UnifiedGridFSHelper {
+final class UnifiedGridFSHelper extends UnifiedHelper{
private final Entities entities;
UnifiedGridFSHelper(final Entities entities) {
this.entities = entities;
}
+ public OperationResult executeFind(final BsonDocument operation) {
+ GridFSFindIterable iterable = createGridFSFindIterable(operation);
+ try {
+ ArrayList target = new ArrayList<>();
+ iterable.into(target);
+
+ if (target.isEmpty()) {
+ return OperationResult.NONE;
+ }
+
+ throw new UnsupportedOperationException("expectResult is not implemented for Unified GridFS tests. "
+ + "Unexpected result: " + target);
+ } catch (Exception e) {
+ return OperationResult.of(e);
+ }
+ }
+
+ public OperationResult executeRename(final BsonDocument operation) {
+ GridFSBucket bucket = getGridFsBucket(operation);
+ BsonDocument arguments = operation.getDocument("arguments");
+ BsonValue id = arguments.get("id");
+ String fileName = arguments.get("newFilename").asString().getValue();
+
+ requireNonNull(id);
+ requireNonNull(fileName);
+
+ try {
+ bucket.rename(id, fileName);
+ return OperationResult.NONE;
+ } catch (Exception e) {
+ return OperationResult.of(e);
+ }
+ }
+
OperationResult executeDelete(final BsonDocument operation) {
- GridFSBucket bucket = entities.getBucket(operation.getString("object").getValue());
+ GridFSBucket bucket = getGridFsBucket(operation);
BsonDocument arguments = operation.getDocument("arguments");
BsonValue id = arguments.get("id");
@@ -63,8 +101,23 @@ OperationResult executeDelete(final BsonDocument operation) {
}
}
+ public OperationResult executeDrop(final BsonDocument operation) {
+ GridFSBucket bucket = getGridFsBucket(operation);
+ BsonDocument arguments = operation.getDocument("arguments", new BsonDocument());
+ if (arguments.size() > 0) {
+ throw new UnsupportedOperationException("Unexpected arguments " + operation.get("arguments"));
+ }
+
+ try {
+ bucket.drop();
+ return OperationResult.NONE;
+ } catch (Exception e) {
+ return OperationResult.of(e);
+ }
+ }
+
public OperationResult executeDownload(final BsonDocument operation) {
- GridFSBucket bucket = entities.getBucket(operation.getString("object").getValue());
+ GridFSBucket bucket = getGridFsBucket(operation);
BsonDocument arguments = operation.getDocument("arguments");
BsonValue id = arguments.get("id");
@@ -119,7 +172,7 @@ private GridFSDownloadOptions getDownloadOptions(final BsonDocument arguments) {
}
public OperationResult executeUpload(final BsonDocument operation) {
- GridFSBucket bucket = entities.getBucket(operation.getString("object").getValue());
+ GridFSBucket bucket = getGridFsBucket(operation);
BsonDocument arguments = operation.getDocument("arguments");
String filename = null;
@@ -165,4 +218,46 @@ public OperationResult executeUpload(final BsonDocument operation) {
Document asDocument(final BsonDocument bsonDocument) {
return new DocumentCodec().decode(new BsonDocumentReader(bsonDocument), DecoderContext.builder().build());
}
+
+ private GridFSBucket getGridFsBucket(final BsonDocument operation) {
+ GridFSBucket bucket = entities.getBucket(operation.getString("object").getValue());
+ Long timeoutMS = getAndRemoveTimeoutMS(operation.getDocument("arguments", new BsonDocument()));
+ if (timeoutMS != null) {
+ bucket = bucket.withTimeout(timeoutMS, TimeUnit.MILLISECONDS);
+ }
+ return bucket;
+ }
+
+ private GridFSFindIterable createGridFSFindIterable(final BsonDocument operation) {
+ GridFSBucket bucket = getGridFsBucket(operation);
+
+ BsonDocument arguments = operation.getDocument("arguments");
+ BsonDocument filter = arguments.getDocument("filter");
+ GridFSFindIterable iterable = bucket.find(filter);
+ for (Map.Entry cur : arguments.entrySet()) {
+ switch (cur.getKey()) {
+ case "session":
+ case "filter":
+ break;
+ case "sort":
+ iterable.sort(cur.getValue().asDocument());
+ break;
+ case "batchSize":
+ iterable.batchSize(cur.getValue().asInt32().intValue());
+ break;
+ case "maxTimeMS":
+ iterable.maxTime(cur.getValue().asInt32().longValue(), TimeUnit.MILLISECONDS);
+ break;
+ case "skip":
+ iterable.skip(cur.getValue().asInt32().intValue());
+ break;
+ case "limit":
+ iterable.limit(cur.getValue().asInt32().intValue());
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported argument: " + cur.getKey());
+ }
+ }
+ return iterable;
+ }
}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedHelper.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedHelper.java
new file mode 100644
index 00000000000..027ccf92fb5
--- /dev/null
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedHelper.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.client.unified;
+
+import org.bson.BsonDocument;
+
+abstract class UnifiedHelper {
+
+ static Long getAndRemoveTimeoutMS(final BsonDocument arguments) {
+ Long timeoutMS = null;
+ if (arguments.containsKey("timeoutMS")) {
+ timeoutMS = arguments.getNumber("timeoutMS").longValue();
+ arguments.remove("timeoutMS");
+ }
+ return timeoutMS;
+ }
+}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
index 8b76f426dbc..e7682275438 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
@@ -324,6 +324,7 @@ private void assertOperation(final UnifiedTestContext context, final BsonDocumen
private OperationResult executeOperation(final UnifiedTestContext context, final BsonDocument operation, final int operationNum) {
context.getAssertionContext().push(ContextElement.ofStartedOperation(operation, operationNum));
String name = operation.getString("name").getValue();
+ String object = operation.getString("object").getValue();
try {
switch (name) {
case "createEntities":
@@ -393,6 +394,9 @@ private OperationResult executeOperation(final UnifiedTestContext context, final
case "aggregate":
return crudHelper.executeAggregate(operation);
case "find":
+ if ("bucket".equals(object)){
+ return gridFSHelper.executeFind(operation);
+ }
return crudHelper.executeFind(operation);
case "findOne":
return crudHelper.executeFindOne(operation);
@@ -427,6 +431,9 @@ private OperationResult executeOperation(final UnifiedTestContext context, final
case "modifyCollection":
return crudHelper.executeModifyCollection(operation);
case "rename":
+ if ("bucket".equals(object)){
+ return gridFSHelper.executeRename(operation);
+ }
return crudHelper.executeRenameCollection(operation);
case "createSearchIndex":
return crudHelper.executeCreateSearchIndex(operation);
@@ -460,6 +467,8 @@ private OperationResult executeOperation(final UnifiedTestContext context, final
return crudHelper.executeIterateUntilDocumentOrError(operation);
case "delete":
return gridFSHelper.executeDelete(operation);
+ case "drop":
+ return gridFSHelper.executeDrop(operation);
case "download":
return gridFSHelper.executeDownload(operation);
case "downloadByName":
@@ -819,7 +828,7 @@ private OperationResult executeAssertLsidOnLastTwoCommands(final BsonDocument op
operation.getDocument("arguments").getString("client").getValue());
List events = lastTwoCommandEvents(listener);
String eventsJson = listener.getCommandStartedEvents().stream()
- .map(e -> ((CommandStartedEvent) e).getCommand().toJson())
+ .map(e -> e.getCommand().toJson())
.collect(Collectors.joining(", "));
BsonDocument expected = ((CommandStartedEvent) events.get(0)).getCommand().getDocument("lsid");
BsonDocument actual = ((CommandStartedEvent) events.get(1)).getCommand().getDocument("lsid");
@@ -885,9 +894,9 @@ private boolean indexExists(final BsonDocument operation) {
}
private List lastTwoCommandEvents(final TestCommandListener listener) {
- List events = listener.getCommandStartedEvents();
+ List events = listener.getCommandStartedEvents();
assertTrue(events.size() >= 2);
- return events.subList(events.size() - 2, events.size());
+ return new ArrayList<>(events.subList(events.size() - 2, events.size()));
}
private void addInitialData() {
diff --git a/driver-sync/src/test/unit/com/mongodb/client/gridfs/CollectionTimeoutHelperTest.java b/driver-sync/src/test/unit/com/mongodb/client/gridfs/CollectionTimeoutHelperTest.java
new file mode 100644
index 00000000000..e7e851b270b
--- /dev/null
+++ b/driver-sync/src/test/unit/com/mongodb/client/gridfs/CollectionTimeoutHelperTest.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.client.gridfs;
+
+import com.mongodb.MongoOperationTimeoutException;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.internal.time.Timeout;
+import org.bson.Document;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static com.mongodb.client.gridfs.CollectionTimeoutHelper.collectionWithTimeout;
+import static com.mongodb.internal.mockito.MongoMockito.mock;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.longThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("unchecked")
+class CollectionTimeoutHelperTest {
+
+ private static final String TIMEOUT_ERROR_MESSAGE = "message";
+
+ @Test
+ void shouldNotSetRemainingTimeoutWhenTimeoutIsNull() {
+ //given
+ MongoCollection collection = mock(MongoCollection.class);
+
+ //when
+ MongoCollection result = collectionWithTimeout(collection, TIMEOUT_ERROR_MESSAGE, null);
+
+ //then
+ assertEquals(collection, result);
+ }
+
+ @Test
+ void shouldNotSetRemainingTimeoutWhenTimeoutIsInfinite() {
+ //given
+ MongoCollection collection = mock(MongoCollection.class);
+
+ //when
+ MongoCollection result = collectionWithTimeout(collection, TIMEOUT_ERROR_MESSAGE, Timeout.infinite());
+
+ //then
+ assertEquals(collection, result);
+ }
+
+ @Test
+ void shouldSetRemainingTimeoutWhenTimeout() {
+ //given
+ MongoCollection collectionWithTimeout = mock(MongoCollection.class);
+ MongoCollection collection = mock(MongoCollection.class, mongoCollection -> {
+ when(mongoCollection.withTimeout(anyLong(), eq(TimeUnit.MILLISECONDS))).thenReturn(mongoCollection);
+ });
+ Timeout timeout = Timeout.expiresIn(1, TimeUnit.DAYS);
+
+ //when
+ MongoCollection result = collectionWithTimeout(collection, TIMEOUT_ERROR_MESSAGE, timeout);
+
+ //then
+ verify(collection).withTimeout(longThat(remaining -> remaining > 0), eq(TimeUnit.MILLISECONDS));
+ assertNotEquals(collectionWithTimeout, result);
+ }
+
+ @Test
+ void shouldThrowErrorWhenTimeoutHasExpired() {
+ //given
+ MongoCollection collection = mock(MongoCollection.class);
+ Timeout timeout = Timeout.expiresIn(1, TimeUnit.MICROSECONDS);
+
+ //when
+ MongoOperationTimeoutException mongoExecutionTimeoutException =
+ assertThrows(MongoOperationTimeoutException.class, () -> collectionWithTimeout(collection, TIMEOUT_ERROR_MESSAGE, timeout));
+
+ //then
+ assertEquals(TIMEOUT_ERROR_MESSAGE, mongoExecutionTimeoutException.getMessage());
+ verifyNoInteractions(collection);
+ }
+
+ @Test
+ void shouldThrowErrorWhenTimeoutHasExpiredWithZeroRemaining() {
+ //given
+ MongoCollection collection = mock(MongoCollection.class);
+ Timeout timeout = mock(Timeout.class, timeout1 -> {
+ when(timeout1.hasExpired()).thenReturn(true);
+ when(timeout1.isInfinite()).thenReturn(false);
+ when(timeout1.remaining(TimeUnit.MILLISECONDS)).thenReturn(0L);
+ });
+
+ //when
+ assertThrows(MongoOperationTimeoutException.class, () -> collectionWithTimeout(collection, TIMEOUT_ERROR_MESSAGE, timeout));
+
+ //then
+ verifyNoInteractions(collection);
+ }
+}
diff --git a/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSBucketSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSBucketSpecification.groovy
index fdae1bee894..ac3fb98281c 100644
--- a/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSBucketSpecification.groovy
+++ b/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSBucketSpecification.groovy
@@ -33,6 +33,7 @@ import com.mongodb.client.internal.OperationExecutor
import com.mongodb.client.internal.TestOperationExecutor
import com.mongodb.client.result.DeleteResult
import com.mongodb.client.result.UpdateResult
+import com.mongodb.internal.TimeoutSettings
import com.mongodb.internal.operation.BatchCursor
import com.mongodb.internal.operation.FindOperation
import org.bson.BsonDocument
@@ -45,6 +46,8 @@ import org.bson.types.ObjectId
import spock.lang.Specification
import spock.lang.Unroll
+import java.util.concurrent.TimeUnit
+
import static com.mongodb.ClusterFixture.TIMEOUT_SETTINGS
import static com.mongodb.CustomMatchers.isTheSameAs
import static com.mongodb.ReadPreference.primary
@@ -156,7 +159,9 @@ class GridFSBucketSpecification extends Specification {
given:
def defaultChunkSizeBytes = 255 * 1024
def database = new MongoDatabaseImpl('test', fromProviders(new DocumentCodecProvider()), secondary(), WriteConcern.ACKNOWLEDGED,
- false, false, readConcern, JAVA_LEGACY, null, null, new TestOperationExecutor([]))
+ false, false, readConcern, JAVA_LEGACY, null,
+ new TimeoutSettings(0, 0, 0, null, 0),
+ new TestOperationExecutor([]))
when:
def gridFSBucket = new GridFSBucketImpl(database)
@@ -172,6 +177,9 @@ class GridFSBucketSpecification extends Specification {
given:
def filesCollection = Stub(MongoCollection)
def chunksCollection = Stub(MongoCollection)
+ filesCollection.getTimeout(TimeUnit.MILLISECONDS) >> null
+ chunksCollection.getTimeout(TimeUnit.MILLISECONDS) >> null
+
def gridFSBucket = new GridFSBucketImpl('fs', 255, filesCollection, chunksCollection)
when:
@@ -184,7 +192,7 @@ class GridFSBucketSpecification extends Specification {
then:
expect stream, isTheSameAs(new GridFSUploadStreamImpl(clientSession, filesCollection, chunksCollection, stream.getId(), 'filename',
- 255, null), ['closeLock'])
+ 255, null, null), ['closeLock'])
where:
clientSession << [null, Stub(ClientSession)]
@@ -291,7 +299,9 @@ class GridFSBucketSpecification extends Specification {
def fileInfo = new GridFSFile(fileId, 'File 1', 10, 255, new Date(), new Document())
def findIterable = Mock(FindIterable)
def filesCollection = Mock(MongoCollection)
+ filesCollection.getTimeout(TimeUnit.MILLISECONDS) >> null
def chunksCollection = Stub(MongoCollection)
+ chunksCollection.getTimeout(TimeUnit.MILLISECONDS) >> null
def gridFSBucket = new GridFSBucketImpl('fs', 255, filesCollection, chunksCollection)
when:
@@ -312,7 +322,8 @@ class GridFSBucketSpecification extends Specification {
1 * findIterable.first() >> fileInfo
then:
- expect stream, isTheSameAs(new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection), ['closeLock', 'cursorLock'])
+ expect stream, isTheSameAs(new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection,
+ null), ['closeLock', 'cursorLock'])
where:
@@ -516,7 +527,9 @@ class GridFSBucketSpecification extends Specification {
def fileInfo = new GridFSFile(bsonFileId, filename, 10, 255, new Date(), new Document())
def findIterable = Mock(FindIterable)
def filesCollection = Mock(MongoCollection)
+ filesCollection.getTimeout(TimeUnit.MILLISECONDS) >> null
def chunksCollection = Stub(MongoCollection)
+ chunksCollection.getTimeout(TimeUnit.MILLISECONDS) >> null
def gridFSBucket = new GridFSBucketImpl('fs', 255, filesCollection, chunksCollection)
when:
@@ -534,7 +547,7 @@ class GridFSBucketSpecification extends Specification {
1 * findIterable.first() >> fileInfo
then:
- expect stream, isTheSameAs(new GridFSDownloadStreamImpl(null, fileInfo, chunksCollection), ['closeLock', 'cursorLock'])
+ expect stream, isTheSameAs(new GridFSDownloadStreamImpl(null, fileInfo, chunksCollection, null), ['closeLock', 'cursorLock'])
where:
version | skip | sortOrder
diff --git a/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSDownloadStreamSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSDownloadStreamSpecification.groovy
index 99e8f7a2167..90b53014ed2 100644
--- a/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSDownloadStreamSpecification.groovy
+++ b/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSDownloadStreamSpecification.groovy
@@ -33,7 +33,7 @@ class GridFSDownloadStreamSpecification extends Specification {
def 'should return the file info'() {
when:
- def downloadStream = new GridFSDownloadStreamImpl(null, fileInfo, Stub(MongoCollection))
+ def downloadStream = new GridFSDownloadStreamImpl(null, fileInfo, Stub(MongoCollection), null)
then:
downloadStream.getGridFSFile() == fileInfo
@@ -56,7 +56,7 @@ class GridFSDownloadStreamSpecification extends Specification {
def mongoCursor = Mock(MongoCursor)
def findIterable = Mock(FindIterable)
def chunksCollection = Mock(MongoCollection)
- def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection)
+ def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection, null)
then:
downloadStream.available() == 0
@@ -126,7 +126,8 @@ class GridFSDownloadStreamSpecification extends Specification {
def mongoCursor = Mock(MongoCursor)
def findIterable = Mock(FindIterable)
def chunksCollection = Mock(MongoCollection)
- def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection).batchSize(1)
+ def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection,
+ null).batchSize(1)
then:
downloadStream.available() == 0
@@ -205,7 +206,7 @@ class GridFSDownloadStreamSpecification extends Specification {
def mongoCursor = Mock(MongoCursor)
def findIterable = Mock(FindIterable)
def chunksCollection = Mock(MongoCollection)
- def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection)
+ def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection, null)
when:
def skipResult = downloadStream.skip(15)
@@ -281,7 +282,7 @@ class GridFSDownloadStreamSpecification extends Specification {
def mongoCursor = Mock(MongoCursor)
def findIterable = Mock(FindIterable)
def chunksCollection = Mock(MongoCollection)
- def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection)
+ def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection, null)
when:
def readByte = new byte[10]
@@ -346,7 +347,7 @@ class GridFSDownloadStreamSpecification extends Specification {
def mongoCursor = Mock(MongoCursor)
def findIterable = Mock(FindIterable)
def chunksCollection = Mock(MongoCollection)
- def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection)
+ def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection, null)
when:
downloadStream.mark()
@@ -421,7 +422,7 @@ class GridFSDownloadStreamSpecification extends Specification {
def mongoCursor = Mock(MongoCursor)
def findIterable = Mock(FindIterable)
def chunksCollection = Mock(MongoCollection)
- def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection)
+ def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection, null)
when:
def readByte = new byte[25]
@@ -478,7 +479,7 @@ class GridFSDownloadStreamSpecification extends Specification {
def 'should not throw an exception when trying to mark post close'() {
given:
- def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, Stub(MongoCollection))
+ def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, Stub(MongoCollection), null)
downloadStream.close()
when:
@@ -499,7 +500,7 @@ class GridFSDownloadStreamSpecification extends Specification {
def 'should handle negative skip value correctly '() {
given:
- def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, Stub(MongoCollection))
+ def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, Stub(MongoCollection), null)
when:
def result = downloadStream.skip(-1)
@@ -514,7 +515,7 @@ class GridFSDownloadStreamSpecification extends Specification {
def 'should handle skip that is larger or equal to the file length'() {
given:
def chunksCollection = Mock(MongoCollection)
- def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection)
+ def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection, null)
when:
def result = downloadStream.skip(skipValue)
@@ -535,7 +536,7 @@ class GridFSDownloadStreamSpecification extends Specification {
def 'should throw if trying to pass negative batchSize'() {
given:
- def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, Stub(MongoCollection))
+ def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, Stub(MongoCollection), null)
when:
downloadStream.batchSize(0)
@@ -559,7 +560,7 @@ class GridFSDownloadStreamSpecification extends Specification {
def mongoCursor = Mock(MongoCursor)
def findIterable = Mock(FindIterable)
def chunksCollection = Mock(MongoCollection)
- def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection)
+ def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection, null)
when:
downloadStream.read()
@@ -591,7 +592,7 @@ class GridFSDownloadStreamSpecification extends Specification {
def mongoCursor = Mock(MongoCursor)
def findIterable = Mock(FindIterable)
def chunksCollection = Mock(MongoCollection)
- def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection)
+ def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, chunksCollection, null)
when:
downloadStream.read()
@@ -617,7 +618,7 @@ class GridFSDownloadStreamSpecification extends Specification {
def 'should throw an exception when trying to action post close'() {
given:
- def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, Stub(MongoCollection))
+ def downloadStream = new GridFSDownloadStreamImpl(clientSession, fileInfo, Stub(MongoCollection), null)
downloadStream.close()
when:
diff --git a/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSUploadStreamSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSUploadStreamSpecification.groovy
index cff602d6f9a..04a11e85550 100644
--- a/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSUploadStreamSpecification.groovy
+++ b/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSUploadStreamSpecification.groovy
@@ -34,7 +34,7 @@ class GridFSUploadStreamSpecification extends Specification {
def 'should return the file id'() {
when:
def uploadStream = new GridFSUploadStreamImpl(null, Stub(MongoCollection), Stub(MongoCollection), fileId, filename, 255
- , metadata)
+ , metadata, null)
then:
uploadStream.getId() == fileId
}
@@ -44,7 +44,7 @@ class GridFSUploadStreamSpecification extends Specification {
def filesCollection = Mock(MongoCollection)
def chunksCollection = Mock(MongoCollection)
def uploadStream = new GridFSUploadStreamImpl(clientSession, filesCollection, chunksCollection, fileId, filename, 2
- , metadata)
+ , metadata, null)
when:
uploadStream.write(1)
@@ -70,7 +70,7 @@ class GridFSUploadStreamSpecification extends Specification {
def filesCollection = Mock(MongoCollection)
def chunksCollection = Mock(MongoCollection)
def uploadStream = new GridFSUploadStreamImpl(clientSession, filesCollection, chunksCollection, fileId, filename, 255
- , null)
+ , null, null)
when:
uploadStream.write('file content ' as byte[])
@@ -100,7 +100,8 @@ class GridFSUploadStreamSpecification extends Specification {
def chunksCollection = Mock(MongoCollection)
def content = 'file content ' as byte[]
def metadata = new Document('contentType', 'text/txt')
- def uploadStream = new GridFSUploadStreamImpl(clientSession, filesCollection, chunksCollection, fileId, filename, 255, metadata)
+ def uploadStream = new GridFSUploadStreamImpl(clientSession, filesCollection, chunksCollection, fileId, filename, 255,
+ metadata, null)
def filesId = fileId
when:
@@ -158,7 +159,7 @@ class GridFSUploadStreamSpecification extends Specification {
def filesCollection = Mock(MongoCollection)
def chunksCollection = Mock(MongoCollection)
def uploadStream = new GridFSUploadStreamImpl(clientSession, filesCollection, chunksCollection, fileId, filename, 255
- , metadata)
+ , metadata, null)
when:
uploadStream.close()
@@ -178,7 +179,7 @@ class GridFSUploadStreamSpecification extends Specification {
given:
def chunksCollection = Mock(MongoCollection)
def uploadStream = new GridFSUploadStreamImpl(clientSession, Stub(MongoCollection), chunksCollection, fileId, filename, 255
- , metadata)
+ , metadata, null)
when:
uploadStream.write('file content ' as byte[])
@@ -198,7 +199,7 @@ class GridFSUploadStreamSpecification extends Specification {
def 'should close the stream on abort'() {
given:
def uploadStream = new GridFSUploadStreamImpl(clientSession, Stub(MongoCollection), Stub(MongoCollection), fileId, filename, 255
- , metadata)
+ , metadata, null)
uploadStream.write('file content ' as byte[])
uploadStream.abort()
@@ -216,7 +217,7 @@ class GridFSUploadStreamSpecification extends Specification {
given:
def chunksCollection = Mock(MongoCollection)
def uploadStream = new GridFSUploadStreamImpl(clientSession, Stub(MongoCollection), chunksCollection, fileId, filename, 255
- , metadata)
+ , metadata, null)
when:
uploadStream.write('file content ' as byte[])
@@ -234,7 +235,7 @@ class GridFSUploadStreamSpecification extends Specification {
def filesCollection = Mock(MongoCollection)
def chunksCollection = Mock(MongoCollection)
def uploadStream = new GridFSUploadStreamImpl(clientSession, filesCollection, chunksCollection, fileId, filename, 255
- , metadata)
+ , metadata, null)
when:
uploadStream.close()
uploadStream.write(1)
@@ -252,7 +253,7 @@ class GridFSUploadStreamSpecification extends Specification {
def filesCollection = Mock(MongoCollection)
def chunksCollection = Mock(MongoCollection)
def uploadStream = new GridFSUploadStreamImpl(clientSession, filesCollection, chunksCollection, fileId, filename, 255
- , metadata)
+ , metadata, null)
when:
uploadStream.getObjectId()