Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support @Scheduled on Reactive methods and Kotlin suspending functions #29924

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
111 changes: 111 additions & 0 deletions framework-docs/modules/ROOT/pages/integration/scheduling.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,117 @@ container and once through the `@Configurable` aspect), with the consequence of
`@Scheduled` method being invoked twice.
====

[[scheduling-annotation-support-scheduled-reactive]]
=== The `@Scheduled` annotation on Reactive methods or Kotlin suspending functions

As of Spring Framework 6.1, `@Scheduled` methods are also supported on several types
of reactive methods:

- methods with a `Publisher` return type (or any concrete implementation of `Publisher`)
like in the following example:

[source,java,indent=0,subs="verbatim,quotes"]
----
@Scheduled(fixedDelay = 500)
public Publisher<Void> reactiveSomething() {
// return an instance of Publisher
}
----

- methods with a return type that can be adapted to `Publisher` via the shared instance
of the `ReactiveAdapterRegistry`, provided the type supports _deferred subscription_ like
in the following example:

[source,java,indent=0,subs="verbatim,quotes"]
----
@Scheduled(fixedDelay = 500)
public Single<String> rxjavaNonPublisher() {
return Single.just("example");
}
----

[NOTE]
====
The `CompletableFuture` class is an example of a type that can typically be adapted
to `Publisher` but doesn't support deferred subscription. Its `ReactiveAdapter` in the
registry denotes that by having the `getDescriptor().isDeferred()` method return `false`.
====


- Kotlin suspending functions, like in the following example:

[source,kotlin,indent=0,subs="verbatim,quotes"]
----
@Scheduled(fixedDelay = 500)
suspend fun something() {
// do something asynchronous
}
----

- methods that return a Kotlin `Flow` or `Deferred` instance, like in the following example:

[source,kotlin,indent=0,subs="verbatim,quotes"]
----
@Scheduled(fixedDelay = 500)
fun something(): Flow<Void> {
flow {
// do something asynchronous
}
}
----

All these types of methods must be declared without any arguments. In the case of Kotlin
suspending functions the `kotlinx.coroutines.reactor` bridge must also be present to allow
the framework to invoke a suspending function as a `Publisher`.

The Spring Framework will obtain a `Publisher` out of the annotated method once and will
schedule a `Runnable` in which it subscribes to said `Publisher`. These inner regular
subscriptions happen according to the `cron`/fixedDelay`/`fixedRate` configuration.

If the `Publisher` emits `onNext` signal(s), these are ignored and discarded (the same way
return values from synchronous `@Scheduled` methods are ignored).

In the following example, the `Flux` emits `onNext("Hello"), onNext("World")` every 5
seconds, but these values are unused:

[source,java,indent=0,subs="verbatim,quotes"]
----
@Scheduled(initialDelay = 5000, fixedRate = 5000)
public Flux<String> reactiveSomething() {
return Flux.just("Hello", "World");
}
----

If the `Publisher` emits an `onError` signal, it is logged at WARN level and recovered.
As a result, further scheduled subscription do happen despite the error.

In the following example, the `Mono` subscription fails twice in the first five seconds
then subscriptions start succeeding, printing a message to the standard output every five
seconds:

[source,java,indent=0,subs="verbatim,quotes"]
----
@Scheduled(initialDelay = 0, fixedRate = 5000)
public Mono<Void> reactiveSomething() {
AtomicInteger countdown = new AtomicInteger(2);

return Mono.defer(() -> {
if (countDown.get() == 0 || countDown.decrementAndGet() == 0) {
return Mono.fromRunnable(() -> System.out.println("Message"));
}
return Mono.error(new IllegalStateException("Cannot deliver message"));
})
}
----

[NOTE]
====
When destroying the annotated bean or closing the application context Spring Framework cancels
scheduled tasks, which includes the next scheduled subscription to the `Publisher` as well
as any past subscription that is still currently active (e.g. for long-running publishers,
or even infinite publishers).
====


[[scheduling-annotation-support-async]]
=== The `@Async` annotation
Expand Down
3 changes: 3 additions & 0 deletions spring-context/spring-context.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
optional("org.jetbrains.kotlin:kotlin-reflect")
optional("org.jetbrains.kotlin:kotlin-stdlib")
optional("org.reactivestreams:reactive-streams")
optional("io.projectreactor:reactor-core")
testImplementation(project(":spring-core-test"))
testImplementation(testFixtures(project(":spring-aop")))
testImplementation(testFixtures(project(":spring-beans")))
Expand All @@ -38,6 +39,8 @@ dependencies {
testImplementation("org.awaitility:awaitility")
testImplementation("jakarta.inject:jakarta.inject-tck")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
testImplementation("io.reactivex.rxjava3:rxjava")
testRuntimeOnly("jakarta.xml.bind:jakarta.xml.bind-api")
testRuntimeOnly("org.glassfish:jakarta.el")
// Substitute for javax.management:jmxremote_optional:1.0.1_04 (not available on Maven Central)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@
* a {@code void} return type; if not, the returned value will be ignored
* when called through the scheduler.
*
* <p>Methods that return a reactive {@code Publisher} or a type which can be adapted
* to {@code Publisher} by the default {@code ReactiveAdapterRegistry} are supported.
* The {@code Publisher} MUST support multiple subsequent subscriptions (i.e. be cold).
* The returned Publisher is only produced once, and the scheduling infrastructure
* then periodically {@code subscribe()} to it according to configuration.
* Values emitted by the publisher are ignored. Errors are logged at WARN level, which
* doesn't prevent further iterations. If a {@code fixed delay} is configured, the
* subscription is blocked upon in order to respect the fixed delay semantics.
*
* <p>Kotlin suspending functions are also supported, provided the coroutine-reactor
* bridge ({@code kotlinx.coroutine.reactor}) is present at runtime. This bridge is
* used to adapt the suspending function into a {@code Publisher} which is treated
* the same way as in the reactive method case (see above).
*
* <p>Processing of {@code @Scheduled} annotations is performed by
* registering a {@link ScheduledAnnotationBeanPostProcessor}. This can be
* done manually or, more conveniently, through the {@code <task:annotation-driven/>}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -98,6 +99,7 @@
* @author Elizabeth Chatman
* @author Victor Brown
* @author Sam Brannen
* @author Simon Baslé
* @since 3.0
* @see Scheduled
* @see EnableScheduling
Expand Down Expand Up @@ -143,6 +145,8 @@ public class ScheduledAnnotationBeanPostProcessor

private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);

private final Map<Object, List<Runnable>> reactiveSubscriptions = new IdentityHashMap<>(16);


/**
* Create a default {@code ScheduledAnnotationBeanPostProcessor}.
Expand Down Expand Up @@ -385,15 +389,33 @@ public Object postProcessAfterInitialization(Object bean, String beanName) {
}

simonbasle marked this conversation as resolved.
Show resolved Hide resolved
/**
* Process the given {@code @Scheduled} method declaration on the given bean.
* Process the given {@code @Scheduled} method declaration on the given bean,
* attempting to distinguish {@link #processScheduledAsync(Scheduled, Method, Object) reactive}
* methods from {@link #processScheduledSync(Scheduled, Method, Object) synchronous} methods.
* @param scheduled the {@code @Scheduled} annotation
* @param method the method that the annotation has been declared on
* @param bean the target bean instance
* @see #createRunnable(Object, Method)
* @see #processScheduledSync(Scheduled, Method, Object)
* @see #processScheduledAsync(Scheduled, Method, Object)
*/
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
// Is method a Kotlin suspending function? Throws if true but reactor bridge isn't on the classpath.
// Is method returning a reactive type? Throws if true, but it isn't a deferred Publisher type.
if (ScheduledAnnotationReactiveSupport.isReactive(method)) {
processScheduledAsync(scheduled, method, bean);
return;
}
processScheduledSync(scheduled, method, bean);
}

/**
* Parse the {@code Scheduled} annotation and schedule the provided {@code Runnable}
* accordingly. The Runnable can represent either a synchronous method invocation
* (see {@link #processScheduledSync(Scheduled, Method, Object)}) or an asynchronous
* one (see {@link #processScheduledAsync(Scheduled, Method, Object)}).
*/
protected void processScheduledTask(Scheduled scheduled, Runnable runnable, Method method, Object bean) {
try {
Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Expand Down Expand Up @@ -516,6 +538,53 @@ protected void processScheduled(Scheduled scheduled, Method method, Object bean)
}
}

/**
* Process the given {@code @Scheduled} method declaration on the given bean,
* as a synchronous method. The method MUST take no arguments. Its return value
* is ignored (if any) and the scheduled invocations of the method take place
* using the underlying {@link TaskScheduler} infrastructure.
* @param scheduled the {@code @Scheduled} annotation
* @param method the method that the annotation has been declared on
* @param bean the target bean instance
* @see #createRunnable(Object, Method)
*/
protected void processScheduledSync(Scheduled scheduled, Method method, Object bean) {
Runnable task;
try {
task = createRunnable(bean, method);
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
processScheduledTask(scheduled, task, method, bean);
}

/**
* Process the given {@code @Scheduled} bean method declaration which returns
* a {@code Publisher}, or the given Kotlin suspending function converted to a
* Publisher. A {@code Runnable} which subscribes to that publisher is then repeatedly
* scheduled according to the annotation configuration.
* <p>Note that for fixed delay configuration, the subscription is turned into a blocking
* call instead. Types for which a {@code ReactiveAdapter} is registered but which cannot
* be deferred (i.e. not a {@code Publisher}) are not supported.
* @param scheduled the {@code @Scheduled} annotation
* @param method the method that the annotation has been declared on, which
* MUST either return a Publisher-adaptable type or be a Kotlin suspending function
* @param bean the target bean instance
* @see ScheduledAnnotationReactiveSupport
*/
protected void processScheduledAsync(Scheduled scheduled, Method method, Object bean) {
Runnable task;
try {
task = ScheduledAnnotationReactiveSupport.createSubscriptionRunnable(method, bean, scheduled,
this.reactiveSubscriptions.computeIfAbsent(bean, k -> new CopyOnWriteArrayList<>()));
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
processScheduledTask(scheduled, task, method, bean);
}

/**
* Create a {@link Runnable} for the given bean instance,
* calling the specified scheduled method.
Expand Down Expand Up @@ -554,6 +623,8 @@ private static boolean isP(char ch) {
/**
* Return all currently scheduled tasks, from {@link Scheduled} methods
* as well as from programmatic {@link SchedulingConfigurer} interaction.
* <p>Note this includes upcoming scheduled subscriptions for reactive methods,
* but doesn't cover any currently active subscription for such methods.
* @since 5.0.2
*/
@Override
Expand All @@ -572,20 +643,27 @@ public Set<ScheduledTask> getScheduledTasks() {
@Override
public void postProcessBeforeDestruction(Object bean, String beanName) {
Set<ScheduledTask> tasks;
List<Runnable> liveSubscriptions;
synchronized (this.scheduledTasks) {
tasks = this.scheduledTasks.remove(bean);
liveSubscriptions = this.reactiveSubscriptions.remove(bean);
}
if (tasks != null) {
for (ScheduledTask task : tasks) {
task.cancel();
}
}
if (liveSubscriptions != null) {
for (Runnable subscription : liveSubscriptions) {
subscription.run(); // equivalent to cancelling the subscription
}
}
}

@Override
public boolean requiresDestruction(Object bean) {
synchronized (this.scheduledTasks) {
return this.scheduledTasks.containsKey(bean);
return this.scheduledTasks.containsKey(bean) || this.reactiveSubscriptions.containsKey(bean);
}
}

Expand All @@ -599,6 +677,12 @@ public void destroy() {
}
}
this.scheduledTasks.clear();
Collection<List<Runnable>> allLiveSubscriptions = this.reactiveSubscriptions.values();
for (List<Runnable> liveSubscriptions : allLiveSubscriptions) {
for (Runnable liveSubscription : liveSubscriptions) {
liveSubscription.run(); //equivalent to cancelling the subscription
}
}
}
this.registrar.destroy();
}
Expand Down