-
Notifications
You must be signed in to change notification settings - Fork 37.7k
/
ConnectionFactoryUtils.java
408 lines (375 loc) · 17.8 KB
/
ConnectionFactoryUtils.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.r2dbc.connection;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.R2dbcBadGrammarException;
import io.r2dbc.spi.R2dbcDataIntegrityViolationException;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.R2dbcNonTransientException;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import io.r2dbc.spi.R2dbcPermissionDeniedException;
import io.r2dbc.spi.R2dbcRollbackException;
import io.r2dbc.spi.R2dbcTimeoutException;
import io.r2dbc.spi.R2dbcTransientException;
import io.r2dbc.spi.R2dbcTransientResourceException;
import io.r2dbc.spi.Wrapped;
import reactor.core.publisher.Mono;
import org.springframework.core.Ordered;
import org.springframework.dao.ConcurrencyFailureException;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.dao.PermissionDeniedDataAccessException;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.dao.TransientDataAccessResourceException;
import org.springframework.lang.Nullable;
import org.springframework.r2dbc.BadSqlGrammarException;
import org.springframework.r2dbc.UncategorizedR2dbcException;
import org.springframework.transaction.NoTransactionException;
import org.springframework.transaction.reactive.TransactionSynchronization;
import org.springframework.transaction.reactive.TransactionSynchronizationManager;
import org.springframework.util.Assert;
/**
* Helper class that provides static methods for obtaining R2DBC Connections from
* a {@link ConnectionFactory}.
*
* <p>Used internally by Spring's {@code DatabaseClient}, Spring's R2DBC operation
* objects. Can also be used directly in application code.
*
* @author Mark Paluch
* @author Christoph Strobl
* @since 5.3
* @see R2dbcTransactionManager
* @see org.springframework.transaction.reactive.TransactionSynchronizationManager
*/
public abstract class ConnectionFactoryUtils {
/**
* Order value for ReactiveTransactionSynchronization objects that clean up R2DBC Connections.
*/
public static final int CONNECTION_SYNCHRONIZATION_ORDER = 1000;
/**
* Obtain a {@link Connection} from the given {@link ConnectionFactory}.
* Translates exceptions into the Spring hierarchy of unchecked generic
* data access exceptions, simplifying calling code and making any
* exception that is thrown more meaningful.
* <p>Is aware of a corresponding Connection bound to the current
* {@link TransactionSynchronizationManager}. Will bind a Connection to the
* {@link TransactionSynchronizationManager} if transaction synchronization is active.
* @param connectionFactory the {@link ConnectionFactory} to obtain
* {@link Connection Connections} from
* @return a R2DBC Connection from the given {@link ConnectionFactory}
* @throws DataAccessResourceFailureException if the attempt to get a
* {@link Connection} failed
* @see #releaseConnection
*/
public static Mono<Connection> getConnection(ConnectionFactory connectionFactory) {
return doGetConnection(connectionFactory)
.onErrorMap(e -> new DataAccessResourceFailureException("Failed to obtain R2DBC Connection", e));
}
/**
* Actually obtain a R2DBC Connection from the given {@link ConnectionFactory}.
* Same as {@link #getConnection}, but preserving the original exceptions.
* <p>Is aware of a corresponding Connection bound to the current
* {@link TransactionSynchronizationManager}. Will bind a Connection to the
* {@link TransactionSynchronizationManager} if transaction synchronization is active
* @param connectionFactory the {@link ConnectionFactory} to obtain Connections from
* @return a R2DBC {@link Connection} from the given {@link ConnectionFactory}.
*/
public static Mono<Connection> doGetConnection(ConnectionFactory connectionFactory) {
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
return TransactionSynchronizationManager.forCurrentTransaction().flatMap(synchronizationManager -> {
ConnectionHolder conHolder = (ConnectionHolder) synchronizationManager.getResource(connectionFactory);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
return fetchConnection(connectionFactory).doOnNext(conHolder::setConnection);
}
return Mono.just(conHolder.getConnection());
}
// Else we either got no holder or an empty thread-bound holder here.
Mono<Connection> con = fetchConnection(connectionFactory);
if (synchronizationManager.isSynchronizationActive()) {
return con.flatMap(connection -> Mono.just(connection).doOnNext(conn -> {
// Use same Connection for further R2DBC actions within the transaction.
// Thread-bound object will get removed by synchronization at transaction completion.
ConnectionHolder holderToUse = conHolder;
if (holderToUse == null) {
holderToUse = new ConnectionHolder(conn);
}
else {
holderToUse.setConnection(conn);
}
holderToUse.requested();
synchronizationManager
.registerSynchronization(new ConnectionSynchronization(holderToUse, connectionFactory));
holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != conHolder) {
synchronizationManager.bindResource(connectionFactory, holderToUse);
}
}) // Unexpected exception from external delegation call -> close Connection and rethrow.
.onErrorResume(e -> releaseConnection(connection, connectionFactory).then(Mono.error(e))));
}
return con;
}).onErrorResume(NoTransactionException.class, e -> Mono.from(connectionFactory.create()));
}
/**
* Actually fetch a {@link Connection} from the given {@link ConnectionFactory}.
* @param connectionFactory the {@link ConnectionFactory} to obtain
* {@link Connection}s from
* @return a R2DBC {@link Connection} from the given {@link ConnectionFactory}
* (never {@code null}).
* @throws IllegalStateException if the {@link ConnectionFactory} returned a {@code null} value.
* @see ConnectionFactory#create()
*/
private static Mono<Connection> fetchConnection(ConnectionFactory connectionFactory) {
return Mono.from(connectionFactory.create());
}
/**
* Close the given {@link Connection}, obtained from the given {@link ConnectionFactory}, if
* it is not managed externally (that is, not bound to the subscription).
* @param con the {@link Connection} to close if necessary
* @param connectionFactory the {@link ConnectionFactory} that the Connection was obtained from
* @see #getConnection
*/
public static Mono<Void> releaseConnection(Connection con, ConnectionFactory connectionFactory) {
return doReleaseConnection(con, connectionFactory)
.onErrorMap(e -> new DataAccessResourceFailureException("Failed to close R2DBC Connection", e));
}
/**
* Actually close the given {@link Connection}, obtained from the given
* {@link ConnectionFactory}. Same as {@link #releaseConnection},
* but preserving the original exception.
* @param connection the {@link Connection} to close if necessary
* @param connectionFactory the {@link ConnectionFactory} that the Connection was obtained from
* @see #doGetConnection
*/
public static Mono<Void> doReleaseConnection(Connection connection, ConnectionFactory connectionFactory) {
return TransactionSynchronizationManager.forCurrentTransaction()
.flatMap(synchronizationManager -> {
ConnectionHolder conHolder = (ConnectionHolder) synchronizationManager.getResource(connectionFactory);
if (conHolder != null && connectionEquals(conHolder, connection)) {
// It's the transactional Connection: Don't close it.
conHolder.released();
}
return Mono.from(connection.close());
}).onErrorResume(NoTransactionException.class, e -> Mono.from(connection.close()));
}
/**
* Obtain the {@link ConnectionFactory} from the current {@link TransactionSynchronizationManager}.
* @param connectionFactory the {@link ConnectionFactory} that the Connection was obtained from
* @see TransactionSynchronizationManager
*/
public static Mono<ConnectionFactory> currentConnectionFactory(ConnectionFactory connectionFactory) {
return TransactionSynchronizationManager.forCurrentTransaction()
.filter(TransactionSynchronizationManager::isSynchronizationActive)
.filter(synchronizationManager -> {
ConnectionHolder conHolder = (ConnectionHolder) synchronizationManager.getResource(connectionFactory);
return conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction());
}).map(synchronizationManager -> connectionFactory);
}
/**
* Translate the given {@link R2dbcException} into a generic {@link DataAccessException}.
* <p>The returned DataAccessException is supposed to contain the original
* {@link R2dbcException} as root cause. However, client code may not generally
* rely on this due to DataAccessExceptions possibly being caused by other resource
* APIs as well. That said, a {@code getRootCause() instanceof R2dbcException}
* check (and subsequent cast) is considered reliable when expecting R2DBC-based
* access to have happened.
* @param task readable text describing the task being attempted
* @param sql the SQL query or update that caused the problem (if known)
* @param ex the offending {@link R2dbcException}
* @return the corresponding DataAccessException instance
*/
public static DataAccessException convertR2dbcException(String task, @Nullable String sql, R2dbcException ex) {
if (ex instanceof R2dbcTransientException) {
if (ex instanceof R2dbcTransientResourceException) {
return new TransientDataAccessResourceException(buildMessage(task, sql, ex), ex);
}
if (ex instanceof R2dbcRollbackException) {
return new ConcurrencyFailureException(buildMessage(task, sql, ex), ex);
}
if (ex instanceof R2dbcTimeoutException) {
return new QueryTimeoutException(buildMessage(task, sql, ex), ex);
}
}
if (ex instanceof R2dbcNonTransientException) {
if (ex instanceof R2dbcNonTransientResourceException) {
return new DataAccessResourceFailureException(buildMessage(task, sql, ex), ex);
}
if (ex instanceof R2dbcDataIntegrityViolationException) {
return new DataIntegrityViolationException(buildMessage(task, sql, ex), ex);
}
if (ex instanceof R2dbcPermissionDeniedException) {
return new PermissionDeniedDataAccessException(buildMessage(task, sql, ex), ex);
}
if (ex instanceof R2dbcBadGrammarException) {
return new BadSqlGrammarException(task, (sql != null ? sql : ""), ex);
}
}
return new UncategorizedR2dbcException(buildMessage(task, sql, ex), sql, ex);
}
/**
* Build a message {@code String} for the given {@link R2dbcException}.
* <p>To be called by translator subclasses when creating an instance of a generic
* {@link org.springframework.dao.DataAccessException} class.
* @param task readable text describing the task being attempted
* @param sql the SQL statement that caused the problem
* @param ex the offending {@code R2dbcException}
* @return the message {@code String} to use
*/
private static String buildMessage(String task, @Nullable String sql, R2dbcException ex) {
return task + "; " + (sql != null ? ("SQL [" + sql + "]; ") : "") + ex.getMessage();
}
/**
* Determine whether the given two {@link Connection}s are equal, asking the target
* {@link Connection} in case of a proxy. Used to detect equality even if the user
* passed in a raw target Connection while the held one is a proxy.
* @param conHolder the {@link ConnectionHolder} for the held {@link Connection} (potentially a proxy)
* @param passedInCon the {@link Connection} passed-in by the user (potentially
* a target {@link Connection} without proxy).
* @return whether the given Connections are equal
* @see #getTargetConnection
*/
private static boolean connectionEquals(ConnectionHolder conHolder, Connection passedInCon) {
if (!conHolder.hasConnection()) {
return false;
}
Connection heldCon = conHolder.getConnection();
// Explicitly check for identity too: for Connection handles that do not implement
// "equals" properly).
return (heldCon == passedInCon || heldCon.equals(passedInCon) || getTargetConnection(heldCon).equals(passedInCon));
}
/**
* Return the innermost target {@link Connection} of the given {@link Connection}.
* If the given {@link Connection} is wrapped, it will be unwrapped until a
* plain {@link Connection} is found. Otherwise, the passed-in Connection
* will be returned as-is.
* @param con the {@link Connection} wrapper to unwrap
* @return the innermost target Connection, or the passed-in one if not wrapped
* @see Wrapped#unwrap()
*/
@SuppressWarnings("unchecked")
public static Connection getTargetConnection(Connection con) {
Connection conToUse = con;
while (conToUse instanceof Wrapped<?>) {
conToUse = ((Wrapped<Connection>) conToUse).unwrap();
}
return conToUse;
}
/**
* Determine the connection synchronization order to use for the given {@link ConnectionFactory}.
* Decreased for every level of nesting that a {@link ConnectionFactory} has,
* checked through the level of {@link DelegatingConnectionFactory} nesting.
* @param connectionFactory the {@link ConnectionFactory} to check
* @return the connection synchronization order to use
* @see #CONNECTION_SYNCHRONIZATION_ORDER
*/
private static int getConnectionSynchronizationOrder(ConnectionFactory connectionFactory) {
int order = CONNECTION_SYNCHRONIZATION_ORDER;
ConnectionFactory current = connectionFactory;
while (current instanceof DelegatingConnectionFactory) {
order--;
current = ((DelegatingConnectionFactory) current).getTargetConnectionFactory();
}
return order;
}
/**
* Callback for resource cleanup at the end of a non-native R2DBC transaction.
*/
private static class ConnectionSynchronization implements TransactionSynchronization, Ordered {
private final ConnectionHolder connectionHolder;
private final ConnectionFactory connectionFactory;
private final int order;
private boolean holderActive = true;
ConnectionSynchronization(ConnectionHolder connectionHolder, ConnectionFactory connectionFactory) {
this.connectionHolder = connectionHolder;
this.connectionFactory = connectionFactory;
this.order = getConnectionSynchronizationOrder(connectionFactory);
}
@Override
public int getOrder() {
return this.order;
}
@Override
public Mono<Void> suspend() {
if (this.holderActive) {
return TransactionSynchronizationManager.forCurrentTransaction().flatMap(synchronizationManager -> {
synchronizationManager.unbindResource(this.connectionFactory);
if (this.connectionHolder.hasConnection() && !this.connectionHolder.isOpen()) {
// Release Connection on suspend if the application doesn't keep
// a handle to it anymore. We will fetch a fresh Connection if the
// application accesses the ConnectionHolder again after resume,
// assuming that it will participate in the same transaction.
return releaseConnection(this.connectionHolder.getConnection(), this.connectionFactory)
.doOnTerminate(() -> this.connectionHolder.setConnection(null));
}
return Mono.empty();
});
}
return Mono.empty();
}
@Override
public Mono<Void> resume() {
if (this.holderActive) {
return TransactionSynchronizationManager.forCurrentTransaction()
.doOnNext(synchronizationManager ->
synchronizationManager.bindResource(this.connectionFactory, this.connectionHolder))
.then();
}
return Mono.empty();
}
@Override
public Mono<Void> beforeCompletion() {
// Release Connection early if the holder is not open anymore (that is,
// not used by another resource that has its own cleanup via transaction
// synchronization), to avoid issues with strict transaction implementations
// that expect the close call before transaction completion.
if (!this.connectionHolder.isOpen()) {
return TransactionSynchronizationManager.forCurrentTransaction().flatMap(synchronizationManager -> {
synchronizationManager.unbindResource(this.connectionFactory);
this.holderActive = false;
if (this.connectionHolder.hasConnection()) {
return releaseConnection(this.connectionHolder.getConnection(), this.connectionFactory);
}
return Mono.empty();
});
}
return Mono.empty();
}
@Override
public Mono<Void> afterCompletion(int status) {
// If we haven't closed the Connection in beforeCompletion,
// close it now.
if (this.holderActive) {
// The bound ConnectionHolder might not be available anymore,
// since afterCompletion might get called from a different thread.
return TransactionSynchronizationManager.forCurrentTransaction().flatMap(synchronizationManager -> {
synchronizationManager.unbindResourceIfPossible(this.connectionFactory);
this.holderActive = false;
if (this.connectionHolder.hasConnection()) {
return releaseConnection(this.connectionHolder.getConnection(), this.connectionFactory)
// Reset the ConnectionHolder: It might remain bound to the context.
.doOnTerminate(() -> this.connectionHolder.setConnection(null));
}
return Mono.empty();
});
}
this.connectionHolder.reset();
return Mono.empty();
}
}
}