Skip to content

Commit

Permalink
Make coroutines with custom AOP aspects work with @Transactional
Browse files Browse the repository at this point in the history
Previous to this change, the transactional aspect would supersed the
user-defined AspectJ aspect, shortcircuiting to calling the original
Kotlin suspending function.

This change simplifies the TransactionAspectSupport way of dealing with
transactional coroutines, thanks to the fact that lower level support
for AOP has been introduced in c8169e5.

Closes gh-33095
  • Loading branch information
simonbasle committed Jul 10, 2024
1 parent 3ccaefe commit 1d890a8
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.aopalliance.intercept.MethodInterceptor
import org.aopalliance.intercept.MethodInvocation
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation.Around
import org.aspectj.lang.annotation.Aspect
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.springframework.aop.framework.autoproxy.AspectJAutoProxyInterceptorKotlinIntegrationTests.InterceptorConfig
Expand All @@ -28,10 +31,18 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.EnableAspectJAutoProxy
import org.springframework.stereotype.Component
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
import org.springframework.transaction.annotation.EnableTransactionManagement
import org.springframework.transaction.annotation.Transactional
import org.springframework.transaction.testfixture.ReactiveCallCountingTransactionManager
import reactor.core.publisher.Mono
import java.lang.reflect.Method
import kotlin.annotation.AnnotationTarget.ANNOTATION_CLASS
import kotlin.annotation.AnnotationTarget.CLASS
import kotlin.annotation.AnnotationTarget.FUNCTION
import kotlin.annotation.AnnotationTarget.TYPE


/**
Expand All @@ -43,7 +54,9 @@ import java.lang.reflect.Method
class AspectJAutoProxyInterceptorKotlinIntegrationTests(
@Autowired val echo: Echo,
@Autowired val firstAdvisor: TestPointcutAdvisor,
@Autowired val secondAdvisor: TestPointcutAdvisor) {
@Autowired val secondAdvisor: TestPointcutAdvisor,
@Autowired val countingAspect: CountingAspect,
@Autowired val reactiveTransactionManager: ReactiveCallCountingTransactionManager) {

@Test
fun `Multiple interceptors with regular function`() {
Expand All @@ -67,8 +80,22 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests(
assertThat(secondAdvisor.interceptor.invocations).singleElement().matches { Mono::class.java.isAssignableFrom(it) }
}

@Test // gh-33095
fun `Aspect and reactive transactional with suspending function`() {
assertThat(countingAspect.counter).isZero()
assertThat(reactiveTransactionManager.commits).isZero()
val value = "Hello!"
runBlocking {
assertThat(echo.suspendingTransactionalEcho(value)).isEqualTo(value)
}
assertThat(countingAspect.counter).`as`("aspect applied").isOne()
assertThat(reactiveTransactionManager.begun).isOne()
assertThat(reactiveTransactionManager.commits).`as`("transactional applied").isOne()
}

@Configuration
@EnableAspectJAutoProxy
@EnableTransactionManagement
open class InterceptorConfig {

@Bean
Expand All @@ -77,6 +104,13 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests(
@Bean
open fun secondAdvisor() = TestPointcutAdvisor().apply { order = 1 }

@Bean
open fun countingAspect() = CountingAspect()

@Bean
open fun transactionManager(): ReactiveCallCountingTransactionManager {
return ReactiveCallCountingTransactionManager()
}

@Bean
open fun echo(): Echo {
Expand Down Expand Up @@ -107,6 +141,24 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests(
}
}

@Target(CLASS, FUNCTION, ANNOTATION_CLASS, TYPE)
@Retention(AnnotationRetention.RUNTIME)
annotation class Counting()

@Aspect
@Component
class CountingAspect {

var counter: Long = 0

@Around("@annotation(org.springframework.aop.framework.autoproxy.AspectJAutoProxyInterceptorKotlinIntegrationTests.Counting)")
fun logging(joinPoint: ProceedingJoinPoint): Any {
return (joinPoint.proceed(joinPoint.args) as Mono<*>).doOnTerminate {
counter++
}
}
}

open class Echo {

open fun echo(value: String): String {
Expand All @@ -118,6 +170,13 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests(
return value
}

@Transactional
@Counting
open suspend fun suspendingTransactionalEcho(value: String): String {
delay(1)
return value
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,15 @@
import java.util.concurrent.Future;

import io.vavr.control.Try;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlinx.coroutines.Job;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
import org.springframework.core.CoroutinesUtils;
import org.springframework.core.KotlinDetector;
import org.springframework.core.MethodParameter;
import org.springframework.core.NamedThreadLocal;
Expand Down Expand Up @@ -355,10 +350,6 @@ protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targe
boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method);
boolean hasSuspendingFlowReturnType = isSuspendingFunction &&
COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName());
if (isSuspendingFunction && !(invocation instanceof CoroutinesInvocationCallback)) {
throw new IllegalStateException("Coroutines invocation not supported: " + method);
}
CoroutinesInvocationCallback corInv = (isSuspendingFunction ? (CoroutinesInvocationCallback) invocation : null);

ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
Class<?> reactiveType =
Expand All @@ -371,11 +362,7 @@ protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targe
return new ReactiveTransactionSupport(adapter);
});

InvocationCallback callback = invocation;
if (corInv != null) {
callback = () -> KotlinDelegate.invokeSuspendingFunction(method, corInv);
}
return txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, rtm);
return txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, rtm);
}

PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
Expand Down Expand Up @@ -829,22 +816,6 @@ protected interface InvocationCallback {
}


/**
* Coroutines-supporting extension of the callback interface.
*/
protected interface CoroutinesInvocationCallback extends InvocationCallback {

Object getTarget();

Object[] getArguments();

default Object getContinuation() {
Object[] args = getArguments();
return args[args.length - 1];
}
}


/**
* Internal holder class for a Throwable in a callback transaction model.
*/
Expand Down Expand Up @@ -891,18 +862,6 @@ public static Object evaluateTryFailure(Object retVal, TransactionAttribute txAt
}
}

/**
* Inner class to avoid a hard dependency on Kotlin at runtime.
*/
private static class KotlinDelegate {

public static Publisher<?> invokeSuspendingFunction(Method method, CoroutinesInvocationCallback callback) {
CoroutineContext coroutineContext = ((Continuation<?>) callback.getContinuation()).getContext().minusKey(Job.Key);
return CoroutinesUtils.invokeSuspendingFunction(coroutineContext, method, callback.getTarget(), callback.getArguments());
}

}


/**
* Delegate for Reactor-based management of transactional methods with a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,7 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
@Override
@Nullable
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
@Override
public Object getTarget() {
return invocation.getThis();
}
@Override
public Object[] getArguments() {
return invocation.getArguments();
}
});
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}


Expand Down

0 comments on commit 1d890a8

Please sign in to comment.