From 8562972c93dd43c0800262f126499cb58e03feaa Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Wed, 30 Oct 2019 16:23:37 +0100 Subject: [PATCH] Consider target transaction manager for reactive transaction decision Closes gh-23832 --- .../interceptor/TransactionAspectSupport.java | 173 ++++++------------ .../TransactionInterceptorTests.java | 26 +-- 2 files changed, 74 insertions(+), 125 deletions(-) diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java index c8150bef556a..c2d9bf5599ab 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java @@ -173,7 +173,11 @@ public static TransactionStatus currentTransactionStatus() throws NoTransactionE @Nullable private BeanFactory beanFactory; - private final ConcurrentMap transactionManagerCache = new ConcurrentReferenceHashMap<>(4); + private final ConcurrentMap transactionManagerCache = + new ConcurrentReferenceHashMap<>(4); + + private final ConcurrentMap transactionSupportCache = + new ConcurrentReferenceHashMap<>(1024); protected TransactionAspectSupport() { @@ -301,7 +305,7 @@ public void afterPropertiesSet() { if (getTransactionManager() == null && this.beanFactory == null) { throw new IllegalStateException( "Set the 'transactionManager' property or make sure to run within a BeanFactory " + - "containing a PlatformTransactionManager bean!"); + "containing a TransactionManager bean!"); } if (getTransactionAttributeSource() == null) { throw new IllegalStateException( @@ -325,26 +329,35 @@ public void afterPropertiesSet() { protected Object invokeWithinTransaction(Method method, @Nullable Class targetClass, final InvocationCallback invocation) throws Throwable { - if (this.reactiveAdapterRegistry != null) { - if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) { - throw new TransactionUsageException("Unsupported annotated transaction on suspending function detected: " - + method + ". Use TransactionalOperator.transactional extensions instead."); - } - ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType()); - if (adapter != null) { - return new ReactiveTransactionSupport(adapter).invokeWithinTransaction(method, targetClass, invocation); - } - } - // If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); - final PlatformTransactionManager tm = determineTransactionManager(txAttr); + final TransactionManager tm = determineTransactionManager(txAttr); + + if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) { + ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> { + if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) { + throw new TransactionUsageException( + "Unsupported annotated transaction on suspending function detected: " + method + + ". Use TransactionalOperator.transactional extensions instead."); + } + ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType()); + if (adapter == null) { + throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " + + method.getReturnType()); + } + return new ReactiveTransactionSupport(adapter); + }); + return txSupport.invokeWithinTransaction( + method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm); + } + + PlatformTransactionManager ptm = asPlatformTransactionManager(tm); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); - if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { + if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. - TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); + TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { @@ -378,8 +391,8 @@ protected Object invokeWithinTransaction(Method method, @Nullable Class targe // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { - Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> { - TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); + Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> { + TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status); try { Object retVal = invocation.proceedWithInvocation(); if (vavrPresent && VavrDelegate.isVavrTry(retVal)) { @@ -446,10 +459,10 @@ protected void clearTransactionManagerCache() { * Determine the specific transaction manager to use for the given transaction. */ @Nullable - protected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) { + protected TransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) { // Do not attempt to lookup tx manager if no tx attributes are set if (txAttr == null || this.beanFactory == null) { - return asPlatformTransactionManager(getTransactionManager()); + return getTransactionManager(); } String qualifier = txAttr.getQualifier(); @@ -460,12 +473,11 @@ else if (StringUtils.hasText(this.transactionManagerBeanName)) { return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName); } else { - PlatformTransactionManager defaultTransactionManager = asPlatformTransactionManager(getTransactionManager()); + TransactionManager defaultTransactionManager = getTransactionManager(); if (defaultTransactionManager == null) { - defaultTransactionManager = asPlatformTransactionManager( - this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY)); + defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY); if (defaultTransactionManager == null) { - defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class); + defaultTransactionManager = this.beanFactory.getBean(TransactionManager.class); this.transactionManagerCache.putIfAbsent( DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager); } @@ -474,11 +486,11 @@ else if (StringUtils.hasText(this.transactionManagerBeanName)) { } } - private PlatformTransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) { - PlatformTransactionManager txManager = asPlatformTransactionManager(this.transactionManagerCache.get(qualifier)); + private TransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) { + TransactionManager txManager = this.transactionManagerCache.get(qualifier); if (txManager == null) { txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType( - beanFactory, PlatformTransactionManager.class, qualifier); + beanFactory, TransactionManager.class, qualifier); this.transactionManagerCache.putIfAbsent(qualifier, txManager); } return txManager; @@ -841,33 +853,30 @@ public ReactiveTransactionSupport(ReactiveAdapter adapter) { this.adapter = adapter; } - public Object invokeWithinTransaction(Method method, @Nullable Class targetClass, InvocationCallback invocation) { - // If the transaction attribute is null, the method is non-transactional. - TransactionAttributeSource tas = getTransactionAttributeSource(); - TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); - ReactiveTransactionManager tm = determineTransactionManager(txAttr); + public Object invokeWithinTransaction(Method method, @Nullable Class targetClass, + InvocationCallback invocation, @Nullable TransactionAttribute txAttr, ReactiveTransactionManager rtm) { + String joinpointIdentification = methodIdentification(method, targetClass, txAttr); // Optimize for Mono if (Mono.class.isAssignableFrom(method.getReturnType())) { return TransactionContextManager.currentContext().flatMap(context -> - createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMap(it -> { + createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMap(it -> { try { // Need re-wrapping until we get hold of the exception through usingWhen. - return Mono - .usingWhen( - Mono.just(it), - txInfo -> { - try { - return (Mono) invocation.proceedWithInvocation(); - } - catch (Throwable ex) { - return Mono.error(ex); - } - }, - this::commitTransactionAfterReturning, - (txInfo, err) -> Mono.empty(), - this::commitTransactionAfterReturning) + return Mono.usingWhen( + Mono.just(it), + txInfo -> { + try { + return (Mono) invocation.proceedWithInvocation(); + } + catch (Throwable ex) { + return Mono.error(ex); + } + }, + this::commitTransactionAfterReturning, + (txInfo, err) -> Mono.empty(), + this::commitTransactionAfterReturning) .onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))); } @@ -881,7 +890,7 @@ public Object invokeWithinTransaction(Method method, @Nullable Class targetCl // Any other reactive type, typically a Flux return this.adapter.fromPublisher(TransactionContextManager.currentContext().flatMapMany(context -> - createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMapMany(it -> { + createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMapMany(it -> { try { // Need re-wrapping until we get hold of the exception through usingWhen. return Flux @@ -909,58 +918,8 @@ public Object invokeWithinTransaction(Method method, @Nullable Class targetCl .subscriberContext(TransactionContextManager.getOrCreateContextHolder())); } - @Nullable - private ReactiveTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) { - // Do not attempt to lookup tx manager if no tx attributes are set - if (txAttr == null || beanFactory == null) { - return asReactiveTransactionManager(getTransactionManager()); - } - - String qualifier = txAttr.getQualifier(); - if (StringUtils.hasText(qualifier)) { - return determineQualifiedTransactionManager(beanFactory, qualifier); - } - else if (StringUtils.hasText(transactionManagerBeanName)) { - return determineQualifiedTransactionManager(beanFactory, transactionManagerBeanName); - } - else { - ReactiveTransactionManager defaultTransactionManager = asReactiveTransactionManager(getTransactionManager()); - if (defaultTransactionManager == null) { - defaultTransactionManager = asReactiveTransactionManager( - transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY)); - if (defaultTransactionManager == null) { - defaultTransactionManager = beanFactory.getBean(ReactiveTransactionManager.class); - transactionManagerCache.putIfAbsent( - DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager); - } - } - return defaultTransactionManager; - } - } - - private ReactiveTransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) { - ReactiveTransactionManager txManager = asReactiveTransactionManager(transactionManagerCache.get(qualifier)); - if (txManager == null) { - txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType( - beanFactory, ReactiveTransactionManager.class, qualifier); - transactionManagerCache.putIfAbsent(qualifier, txManager); - } - return txManager; - } - - @Nullable - private ReactiveTransactionManager asReactiveTransactionManager(@Nullable Object transactionManager) { - if (transactionManager == null || transactionManager instanceof ReactiveTransactionManager) { - return (ReactiveTransactionManager) transactionManager; - } - else { - throw new IllegalStateException( - "Specified transaction manager is not a ReactiveTransactionManager: " + transactionManager); - } - } - @SuppressWarnings("serial") - private Mono createTransactionIfNecessary(@Nullable ReactiveTransactionManager tm, + private Mono createTransactionIfNecessary(ReactiveTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { // If no name specified, apply method identification as transaction name. @@ -972,21 +931,9 @@ public String getName() { } }; } - TransactionAttribute attrToUse = txAttr; - - Mono tx = Mono.empty(); - if (txAttr != null) { - if (tm != null) { - tx = tm.getReactiveTransaction(txAttr); - } - else { - if (logger.isDebugEnabled()) { - logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + - "] because no transaction manager has been configured"); - } - } - } + final TransactionAttribute attrToUse = txAttr; + Mono tx = (attrToUse != null ? tm.getReactiveTransaction(attrToUse) : Mono.empty()); return tx.map(it -> prepareTransactionInfo(tm, attrToUse, joinpointIdentification, it)).switchIfEmpty( Mono.defer(() -> Mono.just(prepareTransactionInfo(tm, attrToUse, joinpointIdentification, null)))); } diff --git a/spring-tx/src/test/java/org/springframework/transaction/interceptor/TransactionInterceptorTests.java b/spring-tx/src/test/java/org/springframework/transaction/interceptor/TransactionInterceptorTests.java index 082bc4734277..bc138e813cee 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/interceptor/TransactionInterceptorTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/interceptor/TransactionInterceptorTests.java @@ -28,6 +28,7 @@ import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionException; +import org.springframework.transaction.TransactionManager; import org.springframework.transaction.TransactionStatus; import org.springframework.util.SerializationTestUtils; @@ -42,12 +43,13 @@ * Mock object based tests for TransactionInterceptor. * * @author Rod Johnson + * @author Juergen Hoeller * @since 16.03.2003 */ public class TransactionInterceptorTests extends AbstractTransactionAspectTests { @Override - protected Object advised(Object target, PlatformTransactionManager ptm, TransactionAttributeSource[] tas) throws Exception { + protected Object advised(Object target, PlatformTransactionManager ptm, TransactionAttributeSource[] tas) { TransactionInterceptor ti = new TransactionInterceptor(); ti.setTransactionManager(ptm); ti.setTransactionAttributeSources(tas); @@ -214,14 +216,14 @@ public void determineTransactionManagerWithQualifierSeveralTimes() { DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); attribute.setQualifier("fooTransactionManager"); - PlatformTransactionManager actual = ti.determineTransactionManager(attribute); + TransactionManager actual = ti.determineTransactionManager(attribute); assertThat(actual).isSameAs(txManager); // Call again, should be cached - PlatformTransactionManager actual2 = ti.determineTransactionManager(attribute); + TransactionManager actual2 = ti.determineTransactionManager(attribute); assertThat(actual2).isSameAs(txManager); verify(beanFactory, times(1)).containsBean("fooTransactionManager"); - verify(beanFactory, times(1)).getBean("fooTransactionManager", PlatformTransactionManager.class); + verify(beanFactory, times(1)).getBean("fooTransactionManager", TransactionManager.class); } @Test @@ -233,13 +235,13 @@ public void determineTransactionManagerWithBeanNameSeveralTimes() { PlatformTransactionManager txManager = associateTransactionManager(beanFactory, "fooTransactionManager"); DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); - PlatformTransactionManager actual = ti.determineTransactionManager(attribute); + TransactionManager actual = ti.determineTransactionManager(attribute); assertThat(actual).isSameAs(txManager); // Call again, should be cached - PlatformTransactionManager actual2 = ti.determineTransactionManager(attribute); + TransactionManager actual2 = ti.determineTransactionManager(attribute); assertThat(actual2).isSameAs(txManager); - verify(beanFactory, times(1)).getBean("fooTransactionManager", PlatformTransactionManager.class); + verify(beanFactory, times(1)).getBean("fooTransactionManager", TransactionManager.class); } @Test @@ -248,16 +250,16 @@ public void determineTransactionManagerDefaultSeveralTimes() { TransactionInterceptor ti = simpleTransactionInterceptor(beanFactory); PlatformTransactionManager txManager = mock(PlatformTransactionManager.class); - given(beanFactory.getBean(PlatformTransactionManager.class)).willReturn(txManager); + given(beanFactory.getBean(TransactionManager.class)).willReturn(txManager); DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); - PlatformTransactionManager actual = ti.determineTransactionManager(attribute); + TransactionManager actual = ti.determineTransactionManager(attribute); assertThat(actual).isSameAs(txManager); // Call again, should be cached - PlatformTransactionManager actual2 = ti.determineTransactionManager(attribute); + TransactionManager actual2 = ti.determineTransactionManager(attribute); assertThat(actual2).isSameAs(txManager); - verify(beanFactory, times(1)).getBean(PlatformTransactionManager.class); + verify(beanFactory, times(1)).getBean(TransactionManager.class); } @@ -299,7 +301,7 @@ private TransactionInterceptor simpleTransactionInterceptor(BeanFactory beanFact private PlatformTransactionManager associateTransactionManager(BeanFactory beanFactory, String name) { PlatformTransactionManager transactionManager = mock(PlatformTransactionManager.class); given(beanFactory.containsBean(name)).willReturn(true); - given(beanFactory.getBean(name, PlatformTransactionManager.class)).willReturn(transactionManager); + given(beanFactory.getBean(name, TransactionManager.class)).willReturn(transactionManager); return transactionManager; }