Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide programmatic transaction for MongoDB reactive with Panache #32794

Merged
merged 1 commit into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 22 additions & 3 deletions docs/src/main/asciidoc/mongodb-panache.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -768,11 +768,9 @@

To use them with MongoDB with Panache you need to annotate the method that starts the transaction with the `@Transactional` annotation.

In MongoDB, a transaction is only possible on a replicaset,

Check warning on line 771 in docs/src/main/asciidoc/mongodb-panache.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Spelling] Use correct American English spelling. Did you really mean 'replicaset'? Raw Output: {"message": "[Quarkus.Spelling] Use correct American English spelling. Did you really mean 'replicaset'?", "location": {"path": "docs/src/main/asciidoc/mongodb-panache.adoc", "range": {"start": {"line": 771, "column": 27}}}, "severity": "WARNING"}
luckily our xref:mongodb.adoc#dev-services[Dev Services for MongoDB] setups a single node replicaset so it is compatible with transactions.

Check warning on line 772 in docs/src/main/asciidoc/mongodb-panache.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Spelling] Use correct American English spelling. Did you really mean 'replicaset'? Raw Output: {"message": "[Quarkus.Spelling] Use correct American English spelling. Did you really mean 'replicaset'?", "location": {"path": "docs/src/main/asciidoc/mongodb-panache.adoc", "range": {"start": {"line": 772, "column": 69}}}, "severity": "WARNING"}

WARNING: Transaction support inside MongoDB with Panache is still experimental.

== Custom IDs

IDs are often a touchy subject. In MongoDB, they are usually auto-generated by the database with an `ObjectId` type.
Expand Down Expand Up @@ -1006,7 +1004,28 @@

TIP: `@RestStreamElementType(MediaType.APPLICATION_JSON)` tells RESTEasy Reactive to serialize the object in JSON.

WARNING: Transactions are not supported for Reactive Entities and Repositories.
=== Reactive transactions

MongoDB offers ACID transactions since version 4.0.

To use them with reactive entities or repositories you need to use `io.quarkus.mongodb.panache.common.reactive.Panache.withTransaction()`.

Check warning on line 1011 in docs/src/main/asciidoc/mongodb-panache.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Fluff] Depending on the context, consider using 'Rewrite the sentence, or use 'must', instead of' rather than 'need to'. Raw Output: {"message": "[Quarkus.Fluff] Depending on the context, consider using 'Rewrite the sentence, or use 'must', instead of' rather than 'need to'.", "location": {"path": "docs/src/main/asciidoc/mongodb-panache.adoc", "range": {"start": {"line": 1011, "column": 34}}}, "severity": "INFO"}

[source,java]
----
@POST
public Uni<Response> addPerson(ReactiveTransactionPerson person) {
return Panache.withTransaction(() -> person.persist().map(v -> {
//the ID is populated before sending it to the database
String id = person.id.toString();
return Response.created(URI.create("/reactive-transaction/" + id)).build();
}));
}
----

In MongoDB, a transaction is only possible on a replicaset,
luckily our xref:mongodb.adoc#dev-services[Dev Services for MongoDB] setups a single node replicaset so it is compatible with transactions.

WARNING: Reactive transaction support inside MongoDB with Panache is still experimental.

== Mocking

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package io.quarkus.mongodb.panache.common.reactive;

import java.util.UUID;
import java.util.function.Supplier;

import org.reactivestreams.Publisher;

import com.mongodb.reactivestreams.client.ClientSession;

import io.quarkus.mongodb.panache.common.runtime.BeanUtils;
import io.quarkus.mongodb.reactive.ReactiveMongoClient;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import mutiny.zero.flow.adapters.AdaptersToFlow;

/**
* Utility class for reactive MongoDB with Panache.
*/
public class Panache {
private static final String ERROR_MSG = "MongoDB reactive with Panache requires a safe (isolated) Vert.x sub-context, but the current context hasn't been flagged as such.";

private static final UUID SESSION_KEY = UUID.randomUUID();

/**
* Performs the given work within the scope of a MongoDB transaction.
* The transaction will be rolled back if the work completes with an uncaught exception.
*
* @param <T> The function's return type
* @param work The function to execute in the new transaction
* @return the result of executing the function
*/
public static <T> Uni<T> withTransaction(Supplier<Uni<T>> work) {
Context context = vertxContext();
ClientSession current = context.getLocal(SESSION_KEY);
if (current != null && current.hasActiveTransaction()) {
// reactive session exists - reuse this session
return work.get();
} else {
// reactive session does not exist - open a new one and close it when the returned Uni completes
return Panache.startSession()
.invoke(s -> s.startTransaction())
.invoke(s -> context.putLocal(SESSION_KEY, s))
.chain(s -> work.get())
.call(() -> commitTransaction())
.onFailure().call(() -> abortTransaction())
.eventually(() -> Panache.closeSession());
}
}

/**
* Allow to access the current MongoDB session.
* The session will only exist in the context of a reactive MongoDB with Panache transaction started with
* <code>Panache.withTransaction()</code>.
*
* @see #withTransaction(Supplier)
* @return the current ClientSession or null if none.
*/
public static ClientSession getCurrentSession() {
Context context = Vertx.currentContext();
return context != null ? context.getLocal(SESSION_KEY) : null;
}

private static Uni<?> abortTransaction() {
Context context = vertxContext();
ClientSession current = context.getLocal(SESSION_KEY);
return toUni(current.abortTransaction());
}

private static Uni<?> commitTransaction() {
Context context = vertxContext();
ClientSession current = context.getLocal(SESSION_KEY);
return toUni(current.commitTransaction());
}

private static <T> Uni<T> toUni(Publisher<T> publisher) {
Context context = Vertx.currentContext();
Uni<T> uni = Uni.createFrom().publisher(AdaptersToFlow.publisher(publisher));
if (context != null) {
return uni.emitOn(command -> context.runOnContext(x -> command.run()));
}
return uni;
}

private static Uni<ClientSession> startSession() {
ReactiveMongoClient client = BeanUtils.clientFromArc(null, ReactiveMongoClient.class, true);
return client.startSession();
}

private static void closeSession() {
Context context = vertxContext();
ClientSession current = context.getLocal(SESSION_KEY);
try {
current.close();
} finally {
context.removeLocal(SESSION_KEY);
}
}

/**
*
* @return the current vertx duplicated context
* @throws IllegalStateException If no vertx context is found or is not a safe context as mandated by the
* {@link VertxContextSafetyToggle}
*/
private static Context vertxContext() {
Context context = Vertx.currentContext();
if (context != null) {
VertxContextSafetyToggle.validateContextIfExists(ERROR_MSG, ERROR_MSG);
return context;
} else {
throw new IllegalStateException("No current Vertx context found");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.OptionalInt;
import java.util.Set;

import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;

Expand All @@ -13,6 +14,7 @@
import com.mongodb.client.model.CountOptions;

import io.quarkus.mongodb.FindOptions;
import io.quarkus.mongodb.panache.common.reactive.Panache;
import io.quarkus.mongodb.panache.common.runtime.MongoPropertyUtil;
import io.quarkus.mongodb.reactive.ReactiveMongoCollection;
import io.quarkus.panache.common.Page;
Expand Down Expand Up @@ -168,9 +170,11 @@ public Uni<Long> count() {
countOptions.collation(collation);
}

count = mongoQuery == null
? collection.countDocuments()
: collection.countDocuments(mongoQuery, countOptions);
if (Panache.getCurrentSession() != null) {
count = collection.countDocuments(Panache.getCurrentSession(), getQuery(), countOptions);
} else {
count = collection.countDocuments(getQuery(), countOptions);
}
}
return count;
}
Expand All @@ -184,7 +188,8 @@ public <T extends Entity> Uni<List<T>> list() {
@SuppressWarnings("unchecked")
public <T extends Entity> Multi<T> stream() {
FindOptions options = buildOptions();
return mongoQuery == null ? collection.find(options) : collection.find(mongoQuery, options);
return Panache.getCurrentSession() != null ? collection.find(Panache.getCurrentSession(), getQuery(), options)
: collection.find(getQuery(), options);
}

public <T extends Entity> Uni<T> firstResult() {
Expand All @@ -194,14 +199,18 @@ public <T extends Entity> Uni<T> firstResult() {

public <T extends Entity> Uni<Optional<T>> firstResultOptional() {
FindOptions options = buildOptions(1);
Multi<T> results = mongoQuery == null ? collection.find(options) : collection.find(mongoQuery, options);
Multi<T> results = Panache.getCurrentSession() != null
? collection.find(Panache.getCurrentSession(), getQuery(), options)
: collection.find(getQuery(), options);
return results.collect().first().map(o -> Optional.ofNullable(o));
}

@SuppressWarnings("unchecked")
public <T extends Entity> Uni<T> singleResult() {
FindOptions options = buildOptions(2);
Multi<T> results = mongoQuery == null ? collection.find(options) : collection.find(mongoQuery, options);
Multi<T> results = Panache.getCurrentSession() != null
? collection.find(Panache.getCurrentSession(), getQuery(), options)
: collection.find(getQuery(), options);
return results.collect().asList().map(list -> {
if (list.size() != 1) {
throw new PanacheQueryException("There should be only one result");
Expand All @@ -213,7 +222,9 @@ public <T extends Entity> Uni<T> singleResult() {

public <T extends Entity> Uni<Optional<T>> singleResultOptional() {
FindOptions options = buildOptions(2);
Multi<T> results = mongoQuery == null ? collection.find(options) : collection.find(mongoQuery, options);
Multi<T> results = Panache.getCurrentSession() != null
? collection.find(Panache.getCurrentSession(), getQuery(), options)
: collection.find(getQuery(), options);
return results.collect().asList().map(list -> {
if (list.size() == 2) {
throw new PanacheQueryException("There should be no more than one result");
Expand Down Expand Up @@ -258,4 +269,8 @@ private FindOptions buildOptions(int maxResults) {
}
return options.limit(maxResults);
}

private Bson getQuery() {
return mongoQuery == null ? new BsonDocument() : mongoQuery;
}
}