Skip to content

Commit

Permalink
Consider target transaction manager for reactive transaction decision
Browse files Browse the repository at this point in the history
  • Loading branch information
jhoeller authored and pull[bot] committed Oct 30, 2019
1 parent 23e04d6 commit 8562972
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,11 @@ public static TransactionStatus currentTransactionStatus() throws NoTransactionE
@Nullable
private BeanFactory beanFactory;

private final ConcurrentMap<Object, Object> transactionManagerCache = new ConcurrentReferenceHashMap<>(4);
private final ConcurrentMap<Object, TransactionManager> transactionManagerCache =
new ConcurrentReferenceHashMap<>(4);

private final ConcurrentMap<Method, ReactiveTransactionSupport> transactionSupportCache =
new ConcurrentReferenceHashMap<>(1024);


protected TransactionAspectSupport() {
Expand Down Expand Up @@ -301,7 +305,7 @@ public void afterPropertiesSet() {
if (getTransactionManager() == null && this.beanFactory == null) {
throw new IllegalStateException(
"Set the 'transactionManager' property or make sure to run within a BeanFactory " +
"containing a PlatformTransactionManager bean!");
"containing a TransactionManager bean!");
}
if (getTransactionAttributeSource() == null) {
throw new IllegalStateException(
Expand All @@ -325,26 +329,35 @@ public void afterPropertiesSet() {
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {

if (this.reactiveAdapterRegistry != null) {
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
throw new TransactionUsageException("Unsupported annotated transaction on suspending function detected: "
+ method + ". Use TransactionalOperator.transactional extensions instead.");
}
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
if (adapter != null) {
return new ReactiveTransactionSupport(adapter).invokeWithinTransaction(method, targetClass, invocation);
}
}

// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final TransactionManager tm = determineTransactionManager(txAttr);

if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
throw new TransactionUsageException(
"Unsupported annotated transaction on suspending function detected: " + method +
". Use TransactionalOperator.transactional extensions instead.");
}
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
if (adapter == null) {
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
method.getReturnType());
}
return new ReactiveTransactionSupport(adapter);
});
return txSupport.invokeWithinTransaction(
method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}

PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

Object retVal;
try {
Expand Down Expand Up @@ -378,8 +391,8 @@ protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targe

// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
try {
Object retVal = invocation.proceedWithInvocation();
if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
Expand Down Expand Up @@ -446,10 +459,10 @@ protected void clearTransactionManagerCache() {
* Determine the specific transaction manager to use for the given transaction.
*/
@Nullable
protected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
protected TransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
// Do not attempt to lookup tx manager if no tx attributes are set
if (txAttr == null || this.beanFactory == null) {
return asPlatformTransactionManager(getTransactionManager());
return getTransactionManager();
}

String qualifier = txAttr.getQualifier();
Expand All @@ -460,12 +473,11 @@ else if (StringUtils.hasText(this.transactionManagerBeanName)) {
return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
}
else {
PlatformTransactionManager defaultTransactionManager = asPlatformTransactionManager(getTransactionManager());
TransactionManager defaultTransactionManager = getTransactionManager();
if (defaultTransactionManager == null) {
defaultTransactionManager = asPlatformTransactionManager(
this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY));
defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
if (defaultTransactionManager == null) {
defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class);
defaultTransactionManager = this.beanFactory.getBean(TransactionManager.class);
this.transactionManagerCache.putIfAbsent(
DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
}
Expand All @@ -474,11 +486,11 @@ else if (StringUtils.hasText(this.transactionManagerBeanName)) {
}
}

private PlatformTransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) {
PlatformTransactionManager txManager = asPlatformTransactionManager(this.transactionManagerCache.get(qualifier));
private TransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) {
TransactionManager txManager = this.transactionManagerCache.get(qualifier);
if (txManager == null) {
txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType(
beanFactory, PlatformTransactionManager.class, qualifier);
beanFactory, TransactionManager.class, qualifier);
this.transactionManagerCache.putIfAbsent(qualifier, txManager);
}
return txManager;
Expand Down Expand Up @@ -841,33 +853,30 @@ public ReactiveTransactionSupport(ReactiveAdapter adapter) {
this.adapter = adapter;
}

public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, InvocationCallback invocation) {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
ReactiveTransactionManager tm = determineTransactionManager(txAttr);
public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
InvocationCallback invocation, @Nullable TransactionAttribute txAttr, ReactiveTransactionManager rtm) {

String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

// Optimize for Mono
if (Mono.class.isAssignableFrom(method.getReturnType())) {
return TransactionContextManager.currentContext().flatMap(context ->
createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMap(it -> {
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMap(it -> {
try {
// Need re-wrapping until we get hold of the exception through usingWhen.
return Mono
.<Object, ReactiveTransactionInfo>usingWhen(
Mono.just(it),
txInfo -> {
try {
return (Mono<?>) invocation.proceedWithInvocation();
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
(txInfo, err) -> Mono.empty(),
this::commitTransactionAfterReturning)
return Mono.<Object, ReactiveTransactionInfo>usingWhen(
Mono.just(it),
txInfo -> {
try {
return (Mono<?>) invocation.proceedWithInvocation();
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
(txInfo, err) -> Mono.empty(),
this::commitTransactionAfterReturning)
.onErrorResume(ex ->
completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
}
Expand All @@ -881,7 +890,7 @@ public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetCl

// Any other reactive type, typically a Flux
return this.adapter.fromPublisher(TransactionContextManager.currentContext().flatMapMany(context ->
createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMapMany(it -> {
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMapMany(it -> {
try {
// Need re-wrapping until we get hold of the exception through usingWhen.
return Flux
Expand Down Expand Up @@ -909,58 +918,8 @@ public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetCl
.subscriberContext(TransactionContextManager.getOrCreateContextHolder()));
}

@Nullable
private ReactiveTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
// Do not attempt to lookup tx manager if no tx attributes are set
if (txAttr == null || beanFactory == null) {
return asReactiveTransactionManager(getTransactionManager());
}

String qualifier = txAttr.getQualifier();
if (StringUtils.hasText(qualifier)) {
return determineQualifiedTransactionManager(beanFactory, qualifier);
}
else if (StringUtils.hasText(transactionManagerBeanName)) {
return determineQualifiedTransactionManager(beanFactory, transactionManagerBeanName);
}
else {
ReactiveTransactionManager defaultTransactionManager = asReactiveTransactionManager(getTransactionManager());
if (defaultTransactionManager == null) {
defaultTransactionManager = asReactiveTransactionManager(
transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY));
if (defaultTransactionManager == null) {
defaultTransactionManager = beanFactory.getBean(ReactiveTransactionManager.class);
transactionManagerCache.putIfAbsent(
DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
}
}
return defaultTransactionManager;
}
}

private ReactiveTransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) {
ReactiveTransactionManager txManager = asReactiveTransactionManager(transactionManagerCache.get(qualifier));
if (txManager == null) {
txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType(
beanFactory, ReactiveTransactionManager.class, qualifier);
transactionManagerCache.putIfAbsent(qualifier, txManager);
}
return txManager;
}

@Nullable
private ReactiveTransactionManager asReactiveTransactionManager(@Nullable Object transactionManager) {
if (transactionManager == null || transactionManager instanceof ReactiveTransactionManager) {
return (ReactiveTransactionManager) transactionManager;
}
else {
throw new IllegalStateException(
"Specified transaction manager is not a ReactiveTransactionManager: " + transactionManager);
}
}

@SuppressWarnings("serial")
private Mono<ReactiveTransactionInfo> createTransactionIfNecessary(@Nullable ReactiveTransactionManager tm,
private Mono<ReactiveTransactionInfo> createTransactionIfNecessary(ReactiveTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

// If no name specified, apply method identification as transaction name.
Expand All @@ -972,21 +931,9 @@ public String getName() {
}
};
}
TransactionAttribute attrToUse = txAttr;

Mono<ReactiveTransaction> tx = Mono.empty();
if (txAttr != null) {
if (tm != null) {
tx = tm.getReactiveTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}

final TransactionAttribute attrToUse = txAttr;
Mono<ReactiveTransaction> tx = (attrToUse != null ? tm.getReactiveTransaction(attrToUse) : Mono.empty());
return tx.map(it -> prepareTransactionInfo(tm, attrToUse, joinpointIdentification, it)).switchIfEmpty(
Mono.defer(() -> Mono.just(prepareTransactionInfo(tm, attrToUse, joinpointIdentification, null))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.util.SerializationTestUtils;

Expand All @@ -42,12 +43,13 @@
* Mock object based tests for TransactionInterceptor.
*
* @author Rod Johnson
* @author Juergen Hoeller
* @since 16.03.2003
*/
public class TransactionInterceptorTests extends AbstractTransactionAspectTests {

@Override
protected Object advised(Object target, PlatformTransactionManager ptm, TransactionAttributeSource[] tas) throws Exception {
protected Object advised(Object target, PlatformTransactionManager ptm, TransactionAttributeSource[] tas) {
TransactionInterceptor ti = new TransactionInterceptor();
ti.setTransactionManager(ptm);
ti.setTransactionAttributeSources(tas);
Expand Down Expand Up @@ -214,14 +216,14 @@ public void determineTransactionManagerWithQualifierSeveralTimes() {

DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setQualifier("fooTransactionManager");
PlatformTransactionManager actual = ti.determineTransactionManager(attribute);
TransactionManager actual = ti.determineTransactionManager(attribute);
assertThat(actual).isSameAs(txManager);

// Call again, should be cached
PlatformTransactionManager actual2 = ti.determineTransactionManager(attribute);
TransactionManager actual2 = ti.determineTransactionManager(attribute);
assertThat(actual2).isSameAs(txManager);
verify(beanFactory, times(1)).containsBean("fooTransactionManager");
verify(beanFactory, times(1)).getBean("fooTransactionManager", PlatformTransactionManager.class);
verify(beanFactory, times(1)).getBean("fooTransactionManager", TransactionManager.class);
}

@Test
Expand All @@ -233,13 +235,13 @@ public void determineTransactionManagerWithBeanNameSeveralTimes() {
PlatformTransactionManager txManager = associateTransactionManager(beanFactory, "fooTransactionManager");

DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
PlatformTransactionManager actual = ti.determineTransactionManager(attribute);
TransactionManager actual = ti.determineTransactionManager(attribute);
assertThat(actual).isSameAs(txManager);

// Call again, should be cached
PlatformTransactionManager actual2 = ti.determineTransactionManager(attribute);
TransactionManager actual2 = ti.determineTransactionManager(attribute);
assertThat(actual2).isSameAs(txManager);
verify(beanFactory, times(1)).getBean("fooTransactionManager", PlatformTransactionManager.class);
verify(beanFactory, times(1)).getBean("fooTransactionManager", TransactionManager.class);
}

@Test
Expand All @@ -248,16 +250,16 @@ public void determineTransactionManagerDefaultSeveralTimes() {
TransactionInterceptor ti = simpleTransactionInterceptor(beanFactory);

PlatformTransactionManager txManager = mock(PlatformTransactionManager.class);
given(beanFactory.getBean(PlatformTransactionManager.class)).willReturn(txManager);
given(beanFactory.getBean(TransactionManager.class)).willReturn(txManager);

DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
PlatformTransactionManager actual = ti.determineTransactionManager(attribute);
TransactionManager actual = ti.determineTransactionManager(attribute);
assertThat(actual).isSameAs(txManager);

// Call again, should be cached
PlatformTransactionManager actual2 = ti.determineTransactionManager(attribute);
TransactionManager actual2 = ti.determineTransactionManager(attribute);
assertThat(actual2).isSameAs(txManager);
verify(beanFactory, times(1)).getBean(PlatformTransactionManager.class);
verify(beanFactory, times(1)).getBean(TransactionManager.class);
}


Expand Down Expand Up @@ -299,7 +301,7 @@ private TransactionInterceptor simpleTransactionInterceptor(BeanFactory beanFact
private PlatformTransactionManager associateTransactionManager(BeanFactory beanFactory, String name) {
PlatformTransactionManager transactionManager = mock(PlatformTransactionManager.class);
given(beanFactory.containsBean(name)).willReturn(true);
given(beanFactory.getBean(name, PlatformTransactionManager.class)).willReturn(transactionManager);
given(beanFactory.getBean(name, TransactionManager.class)).willReturn(transactionManager);
return transactionManager;
}

Expand Down

0 comments on commit 8562972

Please sign in to comment.