-
Notifications
You must be signed in to change notification settings - Fork 25
/
SqlClientHelper.java
112 lines (101 loc) · 4.58 KB
/
SqlClientHelper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package io.vertx.mutiny.sqlclient;
import java.util.function.Function;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.sqlclient.TransactionRollbackException;
/**
* Utilities for generating {@link Multi} and {@link Uni} with a {@link SqlClient}.
*/
public class SqlClientHelper {
/**
* Generates a {@link Multi} from operations executed inside a {@link Transaction}.
*
* @param pool the {@link Pool}
* @param sourceSupplier a user-provided function returning a {@link Multi} generated by interacting with the given
* {@link SqlClient}
* @param <T> the type of the items emitted by the {@link Multi}
* @return a {@link Multi} generated from operations executed inside a {@link Transaction}
*/
public static <T> Multi<T> inTransactionMulti(Pool pool, Function<SqlClient, Multi<T>> sourceSupplier) {
return usingConnectionMulti(pool, conn -> {
return conn.begin().onItem().transformToMulti(tx -> {
return sourceSupplier.apply(conn)
.onCompletion().call(tx::commit)
.onFailure(SqlClientHelper::needsRollback).recoverWithMulti(err -> rollbackMulti(tx, err));
});
});
}
private static <T> Multi<T> rollbackMulti(Transaction tx, Throwable originalErr) {
return SqlClientHelper.<T> rollbackUni(tx, originalErr).toMulti();
}
/**
* Generates a {@link Uni} from operations executed inside a {@link Transaction}.
*
* @param pool the {@link Pool}
* @param sourceSupplier a user-provided function returning a {@link Uni} generated by interacting with the given
* {@link SqlClient}
* @param <T> the type of the items emitted by the {@link Uni}
* @return a {@link Uni} generated from operations executed inside a {@link Transaction}
*/
public static <T> Uni<T> inTransactionUni(Pool pool, Function<SqlClient, Uni<T>> sourceSupplier) {
return usingConnectionUni(pool, conn -> conn.begin().onItem().transformToUni(tx -> {
return sourceSupplier.apply(conn)
.onItem().call(tx::commit)
.onFailure(SqlClientHelper::needsRollback).recoverWithUni(err -> rollbackUni(tx, err));
}));
}
private static boolean needsRollback(Throwable throwable) {
return !(throwable instanceof TransactionRollbackException);
}
private static <T> Uni<T> rollbackUni(Transaction tx, Throwable originalErr) {
return tx.rollback().onItemOrFailure().transformToUni((res, err) -> {
if (err != null) {
originalErr.addSuppressed(err);
}
return Uni.createFrom().failure(originalErr);
});
}
/**
* Generates a {@link Multi} from {@link SqlConnection} operations.
*
* @param pool the {@link Pool}
* @param sourceSupplier a user-provided function returning a {@link Multi} generated by interacting with the given
* {@link SqlConnection}
* @param <T> the type of the items emitted by the {@link Multi}
* @return a {@link Multi} generated from {@link SqlConnection} operations
*/
public static <T> Multi<T> usingConnectionMulti(Pool pool, Function<SqlConnection, Multi<T>> sourceSupplier) {
return pool.getConnection().onItem().transformToMulti(conn -> {
try {
return sourceSupplier.apply(conn)
.onTermination().call(conn::close);
} catch (Throwable t) {
return conn.close()
.onItem().transformToMulti(x -> Multi.createFrom().failure(t));
}
});
}
/**
* Generates a {@link Uni} from {@link SqlConnection} operations.
*
* @param pool the {@link Pool}
* @param sourceSupplier a user-provided function returning a {@link Uni} generated by interacting with the given
* {@link SqlConnection}
* @param <T> the type of the item emitted by the {@link Uni}
* @return a {@link Uni} generated from {@link SqlConnection} operations
*/
public static <T> Uni<T> usingConnectionUni(Pool pool, Function<SqlConnection, Uni<T>> sourceSupplier) {
return pool.getConnection().onItem().transformToUni(conn -> {
try {
return sourceSupplier.apply(conn).onTermination().call(conn::close);
} catch (Throwable t) {
//noinspection unchecked
return (Uni<T>) conn.close()
.onItem().failWith(x -> t);
}
});
}
private SqlClientHelper() {
// Utility
}
}