Skip to content

Commit

Permalink
Provide programmative transaction for MongoDB reactive with Panache
Browse files Browse the repository at this point in the history
Fixes #32156
  • Loading branch information
loicmathieu committed Jun 6, 2023
1 parent 5ed7597 commit 1d7e7df
Show file tree
Hide file tree
Showing 9 changed files with 490 additions and 10 deletions.
25 changes: 22 additions & 3 deletions docs/src/main/asciidoc/mongodb-panache.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -771,8 +771,6 @@ To use them with MongoDB with Panache you need to annotate the method that start
In MongoDB, a transaction is only possible on a replicaset,

Check warning on line 771 in docs/src/main/asciidoc/mongodb-panache.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Spelling] Use correct American English spelling. Did you really mean 'replicaset'? Raw Output: {"message": "[Quarkus.Spelling] Use correct American English spelling. Did you really mean 'replicaset'?", "location": {"path": "docs/src/main/asciidoc/mongodb-panache.adoc", "range": {"start": {"line": 771, "column": 27}}}, "severity": "WARNING"}
luckily our xref:mongodb.adoc#dev-services[Dev Services for MongoDB] setups a single node replicaset so it is compatible with transactions.

Check warning on line 772 in docs/src/main/asciidoc/mongodb-panache.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Spelling] Use correct American English spelling. Did you really mean 'replicaset'? Raw Output: {"message": "[Quarkus.Spelling] Use correct American English spelling. Did you really mean 'replicaset'?", "location": {"path": "docs/src/main/asciidoc/mongodb-panache.adoc", "range": {"start": {"line": 772, "column": 69}}}, "severity": "WARNING"}

WARNING: Transaction support inside MongoDB with Panache is still experimental.

== Custom IDs

IDs are often a touchy subject. In MongoDB, they are usually auto-generated by the database with an `ObjectId` type.
Expand Down Expand Up @@ -1006,7 +1004,28 @@ public Multi<ReactivePerson> streamPersons() {

TIP: `@RestStreamElementType(MediaType.APPLICATION_JSON)` tells RESTEasy Reactive to serialize the object in JSON.

WARNING: Transactions are not supported for Reactive Entities and Repositories.
=== Reactive transactions

MongoDB offers ACID transactions since version 4.0.

To use them with reactive entities or repositories you need to use `io.quarkus.mongodb.panache.common.reactive.Panache.withTransaction()`.

Check warning on line 1011 in docs/src/main/asciidoc/mongodb-panache.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Fluff] Depending on the context, consider using 'Rewrite the sentence, or use 'must', instead of' rather than 'need to'. Raw Output: {"message": "[Quarkus.Fluff] Depending on the context, consider using 'Rewrite the sentence, or use 'must', instead of' rather than 'need to'.", "location": {"path": "docs/src/main/asciidoc/mongodb-panache.adoc", "range": {"start": {"line": 1011, "column": 34}}}, "severity": "INFO"}

[source,java]
----
@POST
public Uni<Response> addPerson(ReactiveTransactionPerson person) {
return Panache.withTransaction(() -> person.persist().map(v -> {
//the ID is populated before sending it to the database
String id = person.id.toString();
return Response.created(URI.create("/reactive-transaction/" + id)).build();
}));
}
----

In MongoDB, a transaction is only possible on a replicaset,
luckily our xref:mongodb.adoc#dev-services[Dev Services for MongoDB] setups a single node replicaset so it is compatible with transactions.

WARNING: Reactive transaction support inside MongoDB with Panache is still experimental.

== Mocking

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package io.quarkus.mongodb.panache.common.reactive;

import java.util.UUID;
import java.util.function.Supplier;

import org.reactivestreams.Publisher;

import com.mongodb.reactivestreams.client.ClientSession;

import io.quarkus.mongodb.panache.common.runtime.BeanUtils;
import io.quarkus.mongodb.reactive.ReactiveMongoClient;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import mutiny.zero.flow.adapters.AdaptersToFlow;

/**
* Utility class for reactive MongoDB with Panache.
*/
public class Panache {
private static final String ERROR_MSG = "MongoDB reactive with Panache requires a safe (isolated) Vert.x sub-context, but the current context hasn't been flagged as such.";

private static final UUID SESSION_KEY = UUID.randomUUID();

/**
* Performs the given work within the scope of a MongoDB transaction.
* The transaction will be rolled back if the work completes with an uncaught exception.
*
* @param <T> The function's return type
* @param work The function to execute in the new transaction
* @return the result of executing the function
*/
public static <T> Uni<T> withTransaction(Supplier<Uni<T>> work) {
Context context = vertxContext();
ClientSession current = context.getLocal(SESSION_KEY);
if (current != null && current.hasActiveTransaction()) {
// reactive session exists - reuse this session
return work.get();
} else {
// reactive session does not exist - open a new one and close it when the returned Uni completes
return Panache.startSession()
.invoke(s -> s.startTransaction())
.invoke(s -> context.putLocal(SESSION_KEY, s))
.chain(s -> work.get())
.call(() -> commitTransaction())
.onFailure().call(() -> abortTransaction())
.eventually(() -> Panache.closeSession());
}
}

/**
* Allow to access the current MongoDB session.
* The session will only exist in the context of a reactive MongoDB with Panache transaction started with
* <code>Panache.withTransaction()</code>.
*
* @see #withTransaction(Supplier)
* @return the current ClientSession or null if none.
*/
public static ClientSession getCurrentSession() {
Context context = Vertx.currentContext();
return context != null ? context.getLocal(SESSION_KEY) : null;
}

private static Uni<?> abortTransaction() {
Context context = vertxContext();
ClientSession current = context.getLocal(SESSION_KEY);
return toUni(current.abortTransaction());
}

private static Uni<?> commitTransaction() {
Context context = vertxContext();
ClientSession current = context.getLocal(SESSION_KEY);
return toUni(current.commitTransaction());
}

private static <T> Uni<T> toUni(Publisher<T> publisher) {
Context context = Vertx.currentContext();
Uni<T> uni = Uni.createFrom().publisher(AdaptersToFlow.publisher(publisher));
if (context != null) {
return uni.emitOn(command -> context.runOnContext(x -> command.run()));
}
return uni;
}

private static Uni<ClientSession> startSession() {
ReactiveMongoClient client = BeanUtils.clientFromArc(null, ReactiveMongoClient.class, true);
return client.startSession();
}

private static void closeSession() {
Context context = vertxContext();
ClientSession current = context.getLocal(SESSION_KEY);
try {
current.close();
}
finally {
context.removeLocal(SESSION_KEY);
}
}

/**
*
* @return the current vertx duplicated context
* @throws IllegalStateException If no vertx context is found or is not a safe context as mandated by the
* {@link VertxContextSafetyToggle}
*/
private static Context vertxContext() {
Context context = Vertx.currentContext();
if (context != null) {
VertxContextSafetyToggle.validateContextIfExists(ERROR_MSG, ERROR_MSG);
return context;
} else {
throw new IllegalStateException("No current Vertx context found");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.OptionalInt;
import java.util.Set;

import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;

Expand All @@ -13,6 +14,7 @@
import com.mongodb.client.model.CountOptions;

import io.quarkus.mongodb.FindOptions;
import io.quarkus.mongodb.panache.common.reactive.Panache;
import io.quarkus.mongodb.panache.common.runtime.MongoPropertyUtil;
import io.quarkus.mongodb.reactive.ReactiveMongoCollection;
import io.quarkus.panache.common.Page;
Expand Down Expand Up @@ -168,9 +170,11 @@ public Uni<Long> count() {
countOptions.collation(collation);
}

count = mongoQuery == null
? collection.countDocuments()
: collection.countDocuments(mongoQuery, countOptions);
if (Panache.getCurrentSession() != null) {
count = collection.countDocuments(Panache.getCurrentSession(), getQuery(), countOptions);
} else {
count = collection.countDocuments(getQuery(), countOptions);
}
}
return count;
}
Expand All @@ -184,7 +188,8 @@ public <T extends Entity> Uni<List<T>> list() {
@SuppressWarnings("unchecked")
public <T extends Entity> Multi<T> stream() {
FindOptions options = buildOptions();
return mongoQuery == null ? collection.find(options) : collection.find(mongoQuery, options);
return Panache.getCurrentSession() != null ? collection.find(Panache.getCurrentSession(), getQuery(), options)
: collection.find(getQuery(), options);
}

public <T extends Entity> Uni<T> firstResult() {
Expand All @@ -194,14 +199,18 @@ public <T extends Entity> Uni<T> firstResult() {

public <T extends Entity> Uni<Optional<T>> firstResultOptional() {
FindOptions options = buildOptions(1);
Multi<T> results = mongoQuery == null ? collection.find(options) : collection.find(mongoQuery, options);
Multi<T> results = Panache.getCurrentSession() != null
? collection.find(Panache.getCurrentSession(), getQuery(), options)
: collection.find(getQuery(), options);
return results.collect().first().map(o -> Optional.ofNullable(o));
}

@SuppressWarnings("unchecked")
public <T extends Entity> Uni<T> singleResult() {
FindOptions options = buildOptions(2);
Multi<T> results = mongoQuery == null ? collection.find(options) : collection.find(mongoQuery, options);
Multi<T> results = Panache.getCurrentSession() != null
? collection.find(Panache.getCurrentSession(), getQuery(), options)
: collection.find(getQuery(), options);
return results.collect().asList().map(list -> {
if (list.size() != 1) {
throw new PanacheQueryException("There should be only one result");
Expand All @@ -213,7 +222,9 @@ public <T extends Entity> Uni<T> singleResult() {

public <T extends Entity> Uni<Optional<T>> singleResultOptional() {
FindOptions options = buildOptions(2);
Multi<T> results = mongoQuery == null ? collection.find(options) : collection.find(mongoQuery, options);
Multi<T> results = Panache.getCurrentSession() != null
? collection.find(Panache.getCurrentSession(), getQuery(), options)
: collection.find(getQuery(), options);
return results.collect().asList().map(list -> {
if (list.size() == 2) {
throw new PanacheQueryException("There should be no more than one result");
Expand Down Expand Up @@ -258,4 +269,8 @@ private FindOptions buildOptions(int maxResults) {
}
return options.limit(maxResults);
}

private Bson getQuery() {
return mongoQuery == null ? new BsonDocument() : mongoQuery;
}
}

0 comments on commit 1d7e7df

Please sign in to comment.