Skip to content

Commit

Permalink
Support @Scheduled fixedDelay/fixedRate on Publisher-returning methods
Browse files Browse the repository at this point in the history
This commit adds support for `@Scheduled` annotation on reactive
methods and Kotlin suspending functions.

Reactive methods are methods that return a `Publisher` or a subclass
of `Publisher`. The `ReactiveAdapterRegistry` is used to support many
implementations, such as `Flux`, `Mono`, `Flow`, `Single`, etc.
Methods should not take any argument and published values will be
ignored, as they are already with synchronous support.

This is implemented in `ScheduledAnnotationReactiveSupport`, which
"converts" Publishers to `Runnable`. This strategy keeps track of
active Subscriptions in the `ScheduledAnnotationBeanPostProcessor`,
in order to cancel them all in case of shutdown.
The existing scheduling support for tasks is reused, aligning the
triggering behavior with the existing support: cron, fixedDelay and
fixedRate are all supported strategies.

If the `Publisher` errors, the exception is logged at warn level and
otherwise ignored. As a result new `Runnable` instances will be
created for each execution and scheduling will continue.
The only difference with synchronous support is that error signals
will not be thrown by those `Runnable` tasks and will not be made
available to the `org.springframework.util.ErrorHandler` contract.
This is due to the asynchronous and lazy nature of Publishers.

Closes gh-23533
Closes gh-28515
  • Loading branch information
simonbasle authored and bclozel committed Jun 5, 2023
1 parent 53f8912 commit 35052f2
Show file tree
Hide file tree
Showing 7 changed files with 877 additions and 4 deletions.
111 changes: 111 additions & 0 deletions framework-docs/modules/ROOT/pages/integration/scheduling.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,117 @@ container and once through the `@Configurable` aspect), with the consequence of
`@Scheduled` method being invoked twice.
====

[[scheduling-annotation-support-scheduled-reactive]]
=== The `@Scheduled` annotation on Reactive methods or Kotlin suspending functions

As of Spring Framework 6.1, `@Scheduled` methods are also supported on several types
of reactive methods:

- methods with a `Publisher` return type (or any concrete implementation of `Publisher`)
like in the following example:

[source,java,indent=0,subs="verbatim,quotes"]
----
@Scheduled(fixedDelay = 500)
public Publisher<Void> reactiveSomething() {
// return an instance of Publisher
}
----

- methods with a return type that can be adapted to `Publisher` via the shared instance
of the `ReactiveAdapterRegistry`, provided the type supports _deferred subscription_ like
in the following example:

[source,java,indent=0,subs="verbatim,quotes"]
----
@Scheduled(fixedDelay = 500)
public Single<String> rxjavaNonPublisher() {
return Single.just("example");
}
----

[NOTE]
====
The `CompletableFuture` class is an example of a type that can typically be adapted
to `Publisher` but doesn't support deferred subscription. Its `ReactiveAdapter` in the
registry denotes that by having the `getDescriptor().isDeferred()` method return `false`.
====


- Kotlin suspending functions, like in the following example:

[source,kotlin,indent=0,subs="verbatim,quotes"]
----
@Scheduled(fixedDelay = 500)
suspend fun something() {
// do something asynchronous
}
----

- methods that return a Kotlin `Flow` or `Deferred` instance, like in the following example:

[source,kotlin,indent=0,subs="verbatim,quotes"]
----
@Scheduled(fixedDelay = 500)
fun something(): Flow<Void> {
flow {
// do something asynchronous
}
}
----

All these types of methods must be declared without any arguments. In the case of Kotlin
suspending functions the `kotlinx.coroutines.reactor` bridge must also be present to allow
the framework to invoke a suspending function as a `Publisher`.

The Spring Framework will obtain a `Publisher` out of the annotated method once and will
schedule a `Runnable` in which it subscribes to said `Publisher`. These inner regular
subscriptions happen according to the `cron`/fixedDelay`/`fixedRate` configuration.

If the `Publisher` emits `onNext` signal(s), these are ignored and discarded (the same way
return values from synchronous `@Scheduled` methods are ignored).

In the following example, the `Flux` emits `onNext("Hello"), onNext("World")` every 5
seconds, but these values are unused:

[source,java,indent=0,subs="verbatim,quotes"]
----
@Scheduled(initialDelay = 5000, fixedRate = 5000)
public Flux<String> reactiveSomething() {
return Flux.just("Hello", "World");
}
----

If the `Publisher` emits an `onError` signal, it is logged at WARN level and recovered.
As a result, further scheduled subscription do happen despite the error.

In the following example, the `Mono` subscription fails twice in the first five seconds
then subscriptions start succeeding, printing a message to the standard output every five
seconds:

[source,java,indent=0,subs="verbatim,quotes"]
----
@Scheduled(initialDelay = 0, fixedRate = 5000)
public Mono<Void> reactiveSomething() {
AtomicInteger countdown = new AtomicInteger(2);
return Mono.defer(() -> {
if (countDown.get() == 0 || countDown.decrementAndGet() == 0) {
return Mono.fromRunnable(() -> System.out.println("Message"));
}
return Mono.error(new IllegalStateException("Cannot deliver message"));
})
}
----

[NOTE]
====
When destroying the annotated bean or closing the application context Spring Framework cancels
scheduled tasks, which includes the next scheduled subscription to the `Publisher` as well
as any past subscription that is still currently active (e.g. for long-running publishers,
or even infinite publishers).
====


[[scheduling-annotation-support-async]]
=== The `@Async` annotation
Expand Down
3 changes: 3 additions & 0 deletions spring-context/spring-context.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
optional("org.jetbrains.kotlin:kotlin-reflect")
optional("org.jetbrains.kotlin:kotlin-stdlib")
optional("org.reactivestreams:reactive-streams")
optional("io.projectreactor:reactor-core")
testImplementation(project(":spring-core-test"))
testImplementation(testFixtures(project(":spring-aop")))
testImplementation(testFixtures(project(":spring-beans")))
Expand All @@ -38,6 +39,8 @@ dependencies {
testImplementation("org.awaitility:awaitility")
testImplementation("jakarta.inject:jakarta.inject-tck")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
testImplementation("io.reactivex.rxjava3:rxjava")
testRuntimeOnly("jakarta.xml.bind:jakarta.xml.bind-api")
testRuntimeOnly("org.glassfish:jakarta.el")
// Substitute for javax.management:jmxremote_optional:1.0.1_04 (not available on Maven Central)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@
* a {@code void} return type; if not, the returned value will be ignored
* when called through the scheduler.
*
* <p>Methods that return a reactive {@code Publisher} or a type which can be adapted
* to {@code Publisher} by the default {@code ReactiveAdapterRegistry} are supported.
* The {@code Publisher} MUST support multiple subsequent subscriptions (i.e. be cold).
* The returned Publisher is only produced once, and the scheduling infrastructure
* then periodically {@code subscribe()} to it according to configuration.
* Values emitted by the publisher are ignored. Errors are logged at WARN level, which
* doesn't prevent further iterations. If a {@code fixed delay} is configured, the
* subscription is blocked upon in order to respect the fixed delay semantics.
*
* <p>Kotlin suspending functions are also supported, provided the coroutine-reactor
* bridge ({@code kotlinx.coroutine.reactor}) is present at runtime. This bridge is
* used to adapt the suspending function into a {@code Publisher} which is treated
* the same way as in the reactive method case (see above).
*
* <p>Processing of {@code @Scheduled} annotations is performed by
* registering a {@link ScheduledAnnotationBeanPostProcessor}. This can be
* done manually or, more conveniently, through the {@code <task:annotation-driven/>}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -98,6 +99,7 @@
* @author Elizabeth Chatman
* @author Victor Brown
* @author Sam Brannen
* @author Simon Baslé
* @since 3.0
* @see Scheduled
* @see EnableScheduling
Expand Down Expand Up @@ -143,6 +145,8 @@ public class ScheduledAnnotationBeanPostProcessor

private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);

private final Map<Object, List<Runnable>> reactiveSubscriptions = new IdentityHashMap<>(16);


/**
* Create a default {@code ScheduledAnnotationBeanPostProcessor}.
Expand Down Expand Up @@ -385,15 +389,33 @@ public Object postProcessAfterInitialization(Object bean, String beanName) {
}

/**
* Process the given {@code @Scheduled} method declaration on the given bean.
* Process the given {@code @Scheduled} method declaration on the given bean,
* attempting to distinguish {@link #processScheduledAsync(Scheduled, Method, Object) reactive}
* methods from {@link #processScheduledSync(Scheduled, Method, Object) synchronous} methods.
* @param scheduled the {@code @Scheduled} annotation
* @param method the method that the annotation has been declared on
* @param bean the target bean instance
* @see #createRunnable(Object, Method)
* @see #processScheduledSync(Scheduled, Method, Object)
* @see #processScheduledAsync(Scheduled, Method, Object)
*/
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
// Is method a Kotlin suspending function? Throws if true but reactor bridge isn't on the classpath.
// Is method returning a reactive type? Throws if true, but it isn't a deferred Publisher type.
if (ScheduledAnnotationReactiveSupport.isReactive(method)) {
processScheduledAsync(scheduled, method, bean);
return;
}
processScheduledSync(scheduled, method, bean);
}

/**
* Parse the {@code Scheduled} annotation and schedule the provided {@code Runnable}
* accordingly. The Runnable can represent either a synchronous method invocation
* (see {@link #processScheduledSync(Scheduled, Method, Object)}) or an asynchronous
* one (see {@link #processScheduledAsync(Scheduled, Method, Object)}).
*/
protected void processScheduledTask(Scheduled scheduled, Runnable runnable, Method method, Object bean) {
try {
Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Expand Down Expand Up @@ -516,6 +538,53 @@ protected void processScheduled(Scheduled scheduled, Method method, Object bean)
}
}

/**
* Process the given {@code @Scheduled} method declaration on the given bean,
* as a synchronous method. The method MUST take no arguments. Its return value
* is ignored (if any) and the scheduled invocations of the method take place
* using the underlying {@link TaskScheduler} infrastructure.
* @param scheduled the {@code @Scheduled} annotation
* @param method the method that the annotation has been declared on
* @param bean the target bean instance
* @see #createRunnable(Object, Method)
*/
protected void processScheduledSync(Scheduled scheduled, Method method, Object bean) {
Runnable task;
try {
task = createRunnable(bean, method);
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
processScheduledTask(scheduled, task, method, bean);
}

/**
* Process the given {@code @Scheduled} bean method declaration which returns
* a {@code Publisher}, or the given Kotlin suspending function converted to a
* Publisher. A {@code Runnable} which subscribes to that publisher is then repeatedly
* scheduled according to the annotation configuration.
* <p>Note that for fixed delay configuration, the subscription is turned into a blocking
* call instead. Types for which a {@code ReactiveAdapter} is registered but which cannot
* be deferred (i.e. not a {@code Publisher}) are not supported.
* @param scheduled the {@code @Scheduled} annotation
* @param method the method that the annotation has been declared on, which
* MUST either return a Publisher-adaptable type or be a Kotlin suspending function
* @param bean the target bean instance
* @see ScheduledAnnotationReactiveSupport
*/
protected void processScheduledAsync(Scheduled scheduled, Method method, Object bean) {
Runnable task;
try {
task = ScheduledAnnotationReactiveSupport.createSubscriptionRunnable(method, bean, scheduled,
this.reactiveSubscriptions.computeIfAbsent(bean, k -> new CopyOnWriteArrayList<>()));
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
processScheduledTask(scheduled, task, method, bean);
}

/**
* Create a {@link Runnable} for the given bean instance,
* calling the specified scheduled method.
Expand Down Expand Up @@ -554,6 +623,8 @@ private static boolean isP(char ch) {
/**
* Return all currently scheduled tasks, from {@link Scheduled} methods
* as well as from programmatic {@link SchedulingConfigurer} interaction.
* <p>Note this includes upcoming scheduled subscriptions for reactive methods,
* but doesn't cover any currently active subscription for such methods.
* @since 5.0.2
*/
@Override
Expand All @@ -572,20 +643,27 @@ public Set<ScheduledTask> getScheduledTasks() {
@Override
public void postProcessBeforeDestruction(Object bean, String beanName) {
Set<ScheduledTask> tasks;
List<Runnable> liveSubscriptions;
synchronized (this.scheduledTasks) {
tasks = this.scheduledTasks.remove(bean);
liveSubscriptions = this.reactiveSubscriptions.remove(bean);
}
if (tasks != null) {
for (ScheduledTask task : tasks) {
task.cancel();
}
}
if (liveSubscriptions != null) {
for (Runnable subscription : liveSubscriptions) {
subscription.run(); // equivalent to cancelling the subscription
}
}
}

@Override
public boolean requiresDestruction(Object bean) {
synchronized (this.scheduledTasks) {
return this.scheduledTasks.containsKey(bean);
return this.scheduledTasks.containsKey(bean) || this.reactiveSubscriptions.containsKey(bean);
}
}

Expand All @@ -599,6 +677,12 @@ public void destroy() {
}
}
this.scheduledTasks.clear();
Collection<List<Runnable>> allLiveSubscriptions = this.reactiveSubscriptions.values();
for (List<Runnable> liveSubscriptions : allLiveSubscriptions) {
for (Runnable liveSubscription : liveSubscriptions) {
liveSubscription.run(); //equivalent to cancelling the subscription
}
}
}
this.registrar.destroy();
}
Expand Down

0 comments on commit 35052f2

Please sign in to comment.