Skip to content

Commit 089a55b

Browse files
authoredJan 22, 2025
Add Scala Client Bulk Write API. (#1603)
JAVA-5531
1 parent 51c736e commit 089a55b

File tree

8 files changed

+515
-27
lines changed

8 files changed

+515
-27
lines changed
 

‎driver-scala/src/integration/scala/org/mongodb/scala/syncadapter/SyncMongoCluster.scala

+4-17
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.mongodb.scala.syncadapter
22

3-
import com.mongodb.assertions.Assertions
43
import com.mongodb.client.model.bulk.{ ClientBulkWriteOptions, ClientBulkWriteResult, ClientNamespacedWriteModel }
54
import com.mongodb.{ ClientSessionOptions, ReadConcern, ReadPreference, WriteConcern }
65
import com.mongodb.client.{ ClientSession, MongoCluster => JMongoCluster, MongoDatabase => JMongoDatabase }
@@ -129,33 +128,21 @@ class SyncMongoCluster(wrapped: MongoCluster) extends JMongoCluster {
129128

130129
override def bulkWrite(
131130
models: util.List[_ <: ClientNamespacedWriteModel]
132-
): ClientBulkWriteResult = {
133-
org.junit.Assume.assumeTrue("TODO-JAVA-5531 implement", java.lang.Boolean.parseBoolean(toString))
134-
throw Assertions.fail("TODO-JAVA-5531 implement")
135-
}
131+
): ClientBulkWriteResult = wrapped.bulkWrite(models.asScala.toList).toFuture().get()
136132

137133
override def bulkWrite(
138134
models: util.List[_ <: ClientNamespacedWriteModel],
139135
options: ClientBulkWriteOptions
140-
): ClientBulkWriteResult = {
141-
org.junit.Assume.assumeTrue("TODO-JAVA-5531 implement", java.lang.Boolean.parseBoolean(toString))
142-
throw Assertions.fail("TODO-JAVA-5531 implement")
143-
}
136+
): ClientBulkWriteResult = wrapped.bulkWrite(models.asScala.toList, options).toFuture().get()
144137

145138
override def bulkWrite(
146139
clientSession: ClientSession,
147140
models: util.List[_ <: ClientNamespacedWriteModel]
148-
): ClientBulkWriteResult = {
149-
org.junit.Assume.assumeTrue("TODO-JAVA-5531 implement", java.lang.Boolean.parseBoolean(toString))
150-
throw Assertions.fail("TODO-JAVA-5531 implement")
151-
}
141+
): ClientBulkWriteResult = wrapped.bulkWrite(unwrap(clientSession), models.asScala.toList).toFuture().get()
152142

153143
override def bulkWrite(
154144
clientSession: ClientSession,
155145
models: util.List[_ <: ClientNamespacedWriteModel],
156146
options: ClientBulkWriteOptions
157-
): ClientBulkWriteResult = {
158-
org.junit.Assume.assumeTrue("TODO-JAVA-5531 implement", java.lang.Boolean.parseBoolean(toString))
159-
throw Assertions.fail("TODO-JAVA-5531 implement")
160-
}
147+
): ClientBulkWriteResult = wrapped.bulkWrite(unwrap(clientSession), models.asScala.toList, options).toFuture().get()
161148
}

‎driver-scala/src/main/scala/org/mongodb/scala/MongoCluster.scala

+123
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ import com.mongodb.reactivestreams.client.{ MongoCluster => JMongoCluster }
2222
import org.bson.codecs.configuration.CodecRegistry
2323
import org.mongodb.scala.bson.DefaultHelper.DefaultsTo
2424
import org.mongodb.scala.bson.conversions.Bson
25+
import org.mongodb.scala.model.bulk.ClientNamespacedUpdateManyModel
26+
import org.mongodb.scala.model.bulk.ClientNamespacedDeleteManyModel
27+
import org.mongodb.scala.model.bulk.{ ClientBulkWriteOptions, ClientBulkWriteResult, ClientNamespacedWriteModel }
2528

2629
import scala.collection.JavaConverters._
2730
import scala.concurrent.duration.{ Duration, MILLISECONDS }
@@ -290,4 +293,124 @@ class MongoCluster(private val wrapped: JMongoCluster) {
290293
)(implicit e: C DefaultsTo Document, ct: ClassTag[C]): ChangeStreamObservable[C] =
291294
ChangeStreamObservable(wrapped.watch(clientSession, pipeline.asJava, ct))
292295

296+
/**
297+
* Executes a client-level bulk write operation.
298+
* This method is functionally equivalent to `bulkWrite(List, ClientBulkWriteOptions)`
299+
* with the [[ClientBulkWriteOptions.clientBulkWriteOptions default options]].
300+
*
301+
* This operation supports retryable writes.
302+
* Depending on the number of `models`, encoded size of `models`, and the size limits in effect,
303+
* executing this operation may require multiple `bulkWrite` commands.
304+
* The eligibility for retries is determined per each `bulkWrite` command:
305+
* [[ClientNamespacedUpdateManyModel]], [[ClientNamespacedDeleteManyModel]] in a command render it non-retryable.
306+
*
307+
* This operation is not supported by MongoDB Atlas Serverless instances.
308+
*
309+
* [[https://www.mongodb.com/docs/manual/reference/command/bulkWrite/ bulkWrite]]
310+
* @param models The [[ClientNamespacedWriteModel]] individual write operations.
311+
* @return The [[SingleObservable]] signalling at most one element [[ClientBulkWriteResult]] if the operation is successful,
312+
* or the following errors:
313+
* - [[ClientBulkWriteException]]: If and only if the operation is unsuccessful or partially unsuccessful,
314+
* and there is at least one of the following pieces of information to report:
315+
* [[ClientBulkWriteException ClientBulkWriteException#getWriteConcernErrors]],
316+
* [[ClientBulkWriteException ClientBulkWriteException#getWriteErrors]],
317+
* [[ClientBulkWriteException ClientBulkWriteException#getPartialResult]].
318+
* - [[MongoException]]: Only if the operation is unsuccessful.
319+
* @since 5.4
320+
* @note Requires MongoDB 8.0 or greater.
321+
*/
322+
def bulkWrite(models: List[_ <: ClientNamespacedWriteModel]): SingleObservable[ClientBulkWriteResult] =
323+
wrapped.bulkWrite(models.asJava)
324+
325+
/**
326+
* Executes a client-level bulk write operation.
327+
*
328+
* This operation supports retryable writes.
329+
* Depending on the number of `models`, encoded size of `models`, and the size limits in effect,
330+
* executing this operation may require multiple `bulkWrite` commands.
331+
* The eligibility for retries is determined per each `bulkWrite` command:
332+
* [[ClientNamespacedUpdateManyModel]], [[ClientNamespacedDeleteManyModel]] in a command render it non-retryable.
333+
*
334+
* This operation is not supported by MongoDB Atlas Serverless instances.
335+
*
336+
* [[https://www.mongodb.com/docs/manual/reference/command/bulkWrite/ bulkWrite]]
337+
* @param models The [[ClientNamespacedWriteModel]] individual write operations.
338+
* @param options The options.
339+
* @return The [[SingleObservable]] signalling at most one element [[ClientBulkWriteResult]] if the operation is successful,
340+
* or the following errors:
341+
* - [[ClientBulkWriteException]]: If and only if the operation is unsuccessful or partially unsuccessful,
342+
* and there is at least one of the following pieces of information to report:
343+
* [[ClientBulkWriteException ClientBulkWriteException#getWriteConcernErrors]],
344+
* [[ClientBulkWriteException ClientBulkWriteException#getWriteErrors]],
345+
* [[ClientBulkWriteException ClientBulkWriteException#getPartialResult]].
346+
* - [[MongoException]]: Only if the operation is unsuccessful.
347+
* @since 5.4
348+
* @note Requires MongoDB 8.0 or greater.
349+
*/
350+
def bulkWrite(
351+
models: List[_ <: ClientNamespacedWriteModel],
352+
options: ClientBulkWriteOptions
353+
): SingleObservable[ClientBulkWriteResult] = wrapped.bulkWrite(models.asJava, options)
354+
355+
/**
356+
* Executes a client-level bulk write operation.
357+
* This method is functionally equivalent to `bulkWrite(ClientSession, List, ClientBulkWriteOptions)`
358+
* with the [[ClientBulkWriteOptions.clientBulkWriteOptions default options]].
359+
*
360+
* This operation supports retryable writes.
361+
* Depending on the number of `models`, encoded size of `models`, and the size limits in effect,
362+
* executing this operation may require multiple `bulkWrite` commands.
363+
* The eligibility for retries is determined per each `bulkWrite` command:
364+
* [[ClientNamespacedUpdateManyModel]], [[ClientNamespacedDeleteManyModel]] in a command render it non-retryable.
365+
*
366+
* This operation is not supported by MongoDB Atlas Serverless instances.
367+
*
368+
* [[https://www.mongodb.com/docs/manual/reference/command/bulkWrite/ bulkWrite]]
369+
* @param clientSession [[ClientSession client session]] with which to associate this operation.
370+
* @param models The [[ClientNamespacedWriteModel]] individual write operations.
371+
* @return The [[SingleObservable]] signalling at most one element [[ClientBulkWriteResult]] if the operation is successful,
372+
* or the following errors:
373+
* - [[ClientBulkWriteException]]: If and only if the operation is unsuccessful or partially unsuccessful,
374+
* and there is at least one of the following pieces of information to report:
375+
* [[ClientBulkWriteException ClientBulkWriteException#getWriteConcernErrors]],
376+
* [[ClientBulkWriteException ClientBulkWriteException#getWriteErrors]],
377+
* [[ClientBulkWriteException ClientBulkWriteException#getPartialResult]].
378+
* - [[MongoException]]: Only if the operation is unsuccessful.
379+
* @since 5.4
380+
* @note Requires MongoDB 8.0 or greater.
381+
*/
382+
def bulkWrite(
383+
clientSession: ClientSession,
384+
models: List[_ <: ClientNamespacedWriteModel]
385+
): SingleObservable[ClientBulkWriteResult] = wrapped.bulkWrite(clientSession, models.asJava)
386+
387+
/**
388+
* Executes a client-level bulk write operation.
389+
*
390+
* This operation supports retryable writes.
391+
* Depending on the number of `models`, encoded size of `models`, and the size limits in effect,
392+
* executing this operation may require multiple `bulkWrite` commands.
393+
* The eligibility for retries is determined per each `bulkWrite` command:
394+
* [[ClientNamespacedUpdateManyModel]], [[ClientNamespacedDeleteManyModel]] in a command render it non-retryable.
395+
*
396+
* [[https://www.mongodb.com/docs/manual/reference/command/bulkWrite/ bulkWrite]]
397+
* @param clientSession The [[ClientSession client session]] with which to associate this operation.
398+
* @param models The [[ClientNamespacedWriteModel]] individual write operations.
399+
* @param options The options.
400+
* @return The [[SingleObservable]] signalling at most one element [[ClientBulkWriteResult]] if the operation is successful,
401+
* or the following errors:
402+
* - [[ClientBulkWriteException]]: If and only if the operation is unsuccessful or partially unsuccessful,
403+
* and there is at least one of the following pieces of information to report:
404+
* [[ClientBulkWriteException ClientBulkWriteException#getWriteConcernErrors]],
405+
* [[ClientBulkWriteException ClientBulkWriteException#getWriteErrors]],
406+
* [[ClientBulkWriteException ClientBulkWriteException#getPartialResult]].
407+
* - [[MongoException]]: Only if the operation is unsuccessful.
408+
* @since 5.4
409+
* @note Requires MongoDB 8.0 or greater.
410+
*/
411+
def bulkWrite(
412+
clientSession: ClientSession,
413+
models: List[_ <: ClientNamespacedWriteModel],
414+
options: ClientBulkWriteOptions
415+
): SingleObservable[ClientBulkWriteResult] = wrapped.bulkWrite(clientSession, models.asJava, options)
293416
}

0 commit comments

Comments
 (0)
Failed to load comments.