Skip to content

Commit

Permalink
Support for transactional listeners with reactive transactions
Browse files Browse the repository at this point in the history
TransactionalApplicationListener and TransactionalEventListener automatically detect a reactive TransactionContext as the event source and register the synchronization accordingly. TransactionalEventPublisher is a convenient delegate for publishing corresponding events with the current TransactionContext as event source. This can also serve as a guideline for similar reactive event purposes.

Closes gh-27515
Closes gh-21025
Closes gh-30244
  • Loading branch information
jhoeller committed Aug 1, 2023
1 parent a9d100e commit 450cc21
Show file tree
Hide file tree
Showing 10 changed files with 787 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,14 @@ attribute of the annotation to `true`.

[NOTE]
====
`@TransactionalEventListener` only works with thread-bound transactions managed by
`PlatformTransactionManager`. A reactive transaction managed by `ReactiveTransactionManager`
uses the Reactor context instead of thread-local attributes, so from the perspective of
an event listener, there is no compatible active transaction that it can participate in.
As of 6.1, `@TransactionalEventListener` can work with thread-bound transactions managed by
`PlatformTransactionManager` as well as reactive transactions managed by `ReactiveTransactionManager`.
For the former, listeners are guaranteed to see the current thread-bound transaction.
Since the latter uses the Reactor context instead of thread-local variables, the transaction
context needs to be included in the published event instance as the event source.
See the
{api-spring-framework}/transaction/reactive/TransactionalEventPublisher.html[`TransactionalEventPublisher`]
javadoc for details.
====


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@
* @see org.springframework.context.ApplicationEvent
* @see org.springframework.context.event.ApplicationEventMulticaster
* @see org.springframework.context.event.EventPublicationInterceptor
* @see org.springframework.transaction.event.TransactionalApplicationListener
*/
@FunctionalInterface
public interface ApplicationEventPublisher {
Expand All @@ -42,8 +43,21 @@ public interface ApplicationEventPublisher {
* or even immediate execution at all. Event listeners are encouraged
* to be as efficient as possible, individually using asynchronous
* execution for longer-running and potentially blocking operations.
* <p>For usage in a reactive call stack, include event publication
* as a simple hand-off:
* {@code Mono.fromRunnable(() -> eventPublisher.publishEvent(...))}.
* As with any asynchronous execution, thread-local data is not going
* to be available for reactive listener methods. All state which is
* necessary to process the event needs to be included in the event
* instance itself.
* <p>For the convenient inclusion of the current transaction context
* in a reactive hand-off, consider using
* {@link org.springframework.transaction.reactive.TransactionalEventPublisher#publishEvent(Function)}.
* For thread-bound transactions, this is not necessary since the
* state will be implicitly available through thread-local storage.
* @param event the event to publish
* @see #publishEvent(Object)
* @see ApplicationListener#supportsAsyncExecution()
* @see org.springframework.context.event.ContextRefreshedEvent
* @see org.springframework.context.event.ContextClosedEvent
*/
Expand All @@ -61,6 +75,11 @@ default void publishEvent(ApplicationEvent event) {
* or even immediate execution at all. Event listeners are encouraged
* to be as efficient as possible, individually using asynchronous
* execution for longer-running and potentially blocking operations.
* <p>For the convenient inclusion of the current transaction context
* in a reactive hand-off, consider using
* {@link org.springframework.transaction.reactive.TransactionalEventPublisher#publishEvent(Object)}.
* For thread-bound transactions, this is not necessary since the
* state will be implicitly available through thread-local storage.
* @param event the event to publish
* @since 4.2
* @see #publishEvent(ApplicationEvent)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,12 +32,13 @@
* allows you to prioritize that listener amongst other listeners running before or after
* transaction completion.
*
* <p><b>NOTE: Transactional event listeners only work with thread-bound transactions
* managed by a {@link org.springframework.transaction.PlatformTransactionManager
* PlatformTransactionManager}.</b> A reactive transaction managed by a
* {@link org.springframework.transaction.ReactiveTransactionManager ReactiveTransactionManager}
* uses the Reactor context instead of thread-local variables, so from the perspective of
* an event listener, there is no compatible active transaction that it can participate in.
* <p>As of 6.1, transactional event listeners can work with thread-bound transactions managed
* by a {@link org.springframework.transaction.PlatformTransactionManager} as well as reactive
* transactions managed by a {@link org.springframework.transaction.ReactiveTransactionManager}.
* For the former, listeners are guaranteed to see the current thread-bound transaction.
* Since the latter uses the Reactor context instead of thread-local variables, the transaction
* context needs to be included in the published event instance as the event source:
* see {@link org.springframework.transaction.reactive.TransactionalEventPublisher}.
*
* @author Juergen Hoeller
* @author Oliver Drotbohm
Expand All @@ -60,6 +61,16 @@ default int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}

/**
* Transaction-synchronized listeners do not support asynchronous execution,
* only their target listener ({@link #processEvent}) potentially does.
* @since 6.1
*/
@Override
default boolean supportsAsyncExecution() {
return false;
}

/**
* Return an identifier for the listener to be able to refer to it individually.
* <p>It might be necessary for specific completion callback implementations
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,6 @@
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.Ordered;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -128,11 +127,7 @@ public void processEvent(E event) {

@Override
public void onApplicationEvent(E event) {
if (TransactionSynchronizationManager.isSynchronizationActive() &&
TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionalApplicationListenerSynchronization<>(event, this, this.callbacks));
}
TransactionalApplicationListenerSynchronization.register(event, this, this.callbacks);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.springframework.context.event.EventListener;
import org.springframework.context.event.GenericApplicationListener;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -87,10 +86,10 @@ public void addCallback(SynchronizationCallback callback) {

@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive() &&
TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionalApplicationListenerSynchronization<>(event, this, this.callbacks));
if (TransactionalApplicationListenerSynchronization.register(event, this, this.callbacks)) {
if (logger.isDebugEnabled()) {
logger.debug("Registered transaction synchronization for " + event);
}
}
else if (this.annotation.fallbackExecution()) {
if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,19 +18,21 @@

import java.util.List;

import reactor.core.publisher.Mono;

import org.springframework.context.ApplicationEvent;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.core.Ordered;
import org.springframework.transaction.reactive.TransactionContext;

/**
* {@link TransactionSynchronization} implementation for event processing with a
* {@code TransactionSynchronization} implementations for event processing with a
* {@link TransactionalApplicationListener}.
*
* @author Juergen Hoeller
* @since 5.3
* @param <E> the specific {@code ApplicationEvent} subclass to listen to
*/
class TransactionalApplicationListenerSynchronization<E extends ApplicationEvent>
implements TransactionSynchronization {
abstract class TransactionalApplicationListenerSynchronization<E extends ApplicationEvent> implements Ordered {

private final E event;

Expand All @@ -53,28 +55,11 @@ public int getOrder() {
return this.listener.getOrder();
}

@Override
public void beforeCommit(boolean readOnly) {
if (this.listener.getTransactionPhase() == TransactionPhase.BEFORE_COMMIT) {
processEventWithCallbacks();
}
}

@Override
public void afterCompletion(int status) {
TransactionPhase phase = this.listener.getTransactionPhase();
if (phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
processEventWithCallbacks();
}
else if (phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
processEventWithCallbacks();
}
else if (phase == TransactionPhase.AFTER_COMPLETION) {
processEventWithCallbacks();
}
public TransactionPhase getTransactionPhase() {
return this.listener.getTransactionPhase();
}

private void processEventWithCallbacks() {
public void processEventWithCallbacks() {
this.callbacks.forEach(callback -> callback.preProcessEvent(this.event));
try {
this.listener.processEvent(this.event);
Expand All @@ -86,4 +71,94 @@ private void processEventWithCallbacks() {
this.callbacks.forEach(callback -> callback.postProcessEvent(this.event, null));
}


public static <E extends ApplicationEvent> boolean register(
E event, TransactionalApplicationListener<E> listener,
List<TransactionalApplicationListener.SynchronizationCallback> callbacks) {

if (org.springframework.transaction.support.TransactionSynchronizationManager.isSynchronizationActive() &&
org.springframework.transaction.support.TransactionSynchronizationManager.isActualTransactionActive()) {
org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization(
new PlatformSynchronization<>(event, listener, callbacks));
return true;
}
else if (event.getSource() instanceof TransactionContext txContext) {
org.springframework.transaction.reactive.TransactionSynchronizationManager rtsm =
new org.springframework.transaction.reactive.TransactionSynchronizationManager(txContext);
if (rtsm.isSynchronizationActive() && rtsm.isActualTransactionActive()) {
rtsm.registerSynchronization(new ReactiveSynchronization<>(event, listener, callbacks));
return true;
}
}
return false;
}


private static class PlatformSynchronization<AE extends ApplicationEvent>
extends TransactionalApplicationListenerSynchronization<AE>
implements org.springframework.transaction.support.TransactionSynchronization {

public PlatformSynchronization(AE event, TransactionalApplicationListener<AE> listener,
List<TransactionalApplicationListener.SynchronizationCallback> callbacks) {

super(event, listener, callbacks);
}

@Override
public void beforeCommit(boolean readOnly) {
if (getTransactionPhase() == TransactionPhase.BEFORE_COMMIT) {
processEventWithCallbacks();
}
}

@Override
public void afterCompletion(int status) {
TransactionPhase phase = getTransactionPhase();
if (phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
processEventWithCallbacks();
}
else if (phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
processEventWithCallbacks();
}
else if (phase == TransactionPhase.AFTER_COMPLETION) {
processEventWithCallbacks();
}
}
}


private static class ReactiveSynchronization<AE extends ApplicationEvent>
extends TransactionalApplicationListenerSynchronization<AE>
implements org.springframework.transaction.reactive.TransactionSynchronization {

public ReactiveSynchronization(AE event, TransactionalApplicationListener<AE> listener,
List<TransactionalApplicationListener.SynchronizationCallback> callbacks) {

super(event, listener, callbacks);
}

@Override
public Mono<Void> beforeCommit(boolean readOnly) {
if (getTransactionPhase() == TransactionPhase.BEFORE_COMMIT) {
return Mono.fromRunnable(this::processEventWithCallbacks);
}
return Mono.empty();
}

@Override
public Mono<Void> afterCompletion(int status) {
TransactionPhase phase = getTransactionPhase();
if (phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
return Mono.fromRunnable(this::processEventWithCallbacks);
}
else if (phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
return Mono.fromRunnable(this::processEventWithCallbacks);
}
else if (phase == TransactionPhase.AFTER_COMPLETION) {
return Mono.fromRunnable(this::processEventWithCallbacks);
}
return Mono.empty();
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,12 +37,13 @@
* method allows you to prioritize that listener amongst other listeners running before
* or after transaction completion.
*
* <p><b>NOTE: Transactional event listeners only work with thread-bound transactions
* managed by a {@link org.springframework.transaction.PlatformTransactionManager
* PlatformTransactionManager}.</b> A reactive transaction managed by a
* {@link org.springframework.transaction.ReactiveTransactionManager ReactiveTransactionManager}
* uses the Reactor context instead of thread-local variables, so from the perspective of
* an event listener, there is no compatible active transaction that it can participate in.
* <p>As of 6.1, transactional event listeners can work with thread-bound transactions managed
* by a {@link org.springframework.transaction.PlatformTransactionManager} as well as reactive
* transactions managed by a {@link org.springframework.transaction.ReactiveTransactionManager}.
* For the former, listeners are guaranteed to see the current thread-bound transaction.
* Since the latter uses the Reactor context instead of thread-local variables, the transaction
* context needs to be included in the published event instance as the event source:
* see {@link org.springframework.transaction.reactive.TransactionalEventPublisher}.
*
* <p><strong>WARNING:</strong> if the {@code TransactionPhase} is set to
* {@link TransactionPhase#AFTER_COMMIT AFTER_COMMIT} (the default),
Expand Down

0 comments on commit 450cc21

Please sign in to comment.