Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support @Scheduled on Reactive methods and Kotlin suspending functions #29924

Closed

Conversation

simonbasle
Copy link
Contributor

@simonbasle simonbasle commented Feb 3, 2023

This commit adds support for @Scheduled annotation on reactive methods
and on Kotlin suspending functions (provided a bridge to Reactive Streams
is available at runtime).

Reactive methods are methods that return an instance of Publisher or an
instance of a class that the ReactiveAdapterRegistry can adapt to a
Publisher with deferred support.

Kotlin suspending functions are converted to Publisher as well, using
CoroutineUtils and the support of the kotlinx.coroutines.reactor
bridge.

The bean method that produces the Publisher is only called once, but
there can be multiple subscriptions to that instance as configured by
the annotation. The usual task-based infrastructure for synchronous code
is reused for the purpose of scheduling the subscriptions.

One special case is when a fixedDelay is used, as the non-blocking
nature of subscribing to a Publisher makes it harder to adhere to these
semantics. As a result, for that particular case only, the subscription
is done in a blocking fashion inside the scheduled task.

Publisher onNext events are ignored. Active subscriptions are tracked
by the processor so that long-running publishers and infinite publishers
are also supported, allowing for cancellation if the associated bean is
destroyed or if the context is stopped.

See gh-23533
Closes gh-29924

@simonbasle simonbasle self-assigned this Feb 7, 2023
@simonbasle simonbasle added in: core Issues in core modules (aop, beans, core, context, expression) type: enhancement A general enhancement labels Feb 7, 2023
@simonbasle simonbasle added this to the 6.1.0-M1 milestone Feb 7, 2023
@sdeleuze
Copy link
Contributor

sdeleuze commented Feb 8, 2023

We should probably review #28515 at the same time and have a common approach.

@sdeleuze
Copy link
Contributor

sdeleuze commented Feb 14, 2023

As discussed, let's implement the Coroutines support as part of this PR via an invocation of CoroutinesUtils#invokeSuspendingFunction(Method, Object, Object...) when KotlinDetector#isSuspendingFunction detect a method as suspending, like we do for the Web support + related tests in a KotlinXxx.kt reusing the same logic from Java tests with suspending function instead of Mono, and if that make sense Flow instead of Flux.

Copy link
Contributor

@sdeleuze sdeleuze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we would keep ScheduledAnnotationReactiveSupport.isReactive(method) and processScheduledReactive(scheduled, method, bean) API as they are and we just adapt the implementations to support suspending functions?

@simonbasle
Copy link
Contributor Author

simonbasle commented Feb 16, 2023

What if we would keep ScheduledAnnotationReactiveSupport.isReactive(method) and processScheduledReactive(scheduled, method, bean) API as they are and we just adapt the implementations to support suspending functions?

@sdeleuze done :) As stated in the latest commit, anything Publisher is still in ScheduledAnnotationReactiveSupport, but now there is only one check to perform (and the getPublisherFor method used by ReactiveTask automatically detect the correct case).

@simonbasle simonbasle marked this pull request as ready for review April 20, 2023 08:58
Copy link
Contributor

@sdeleuze sdeleuze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, I just mentioned 2 possibles syles improvements in tests. Nice PR!

@bclozel
Copy link
Member

bclozel commented Apr 24, 2023

Thanks for tackling this - it's definitely the right time to address this as we'll work on observability in #29883.

Copy link
Contributor

@rstoyanchev rstoyanchev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good to see this planned.

My initial comment is to discuss the choice to use Reactor for scheduling. It means a more limited usage style, excluding cron expressions. Also, limits the usefulness of ReactorAdapterRegistry to a degree by requiring Reactor for the scheduling. I'm also wondering if ReactiveTask is going to cause classpath issues for sync methods but that's a more secondary, implementation concern.

Could the scheduling infrastructure used for sync also be used for reactive methods too, with some additional logic to subscribe to the Publisher every time a task is scheduled?

@simonbasle
Copy link
Contributor Author

simonbasle commented Apr 25, 2023

In the few last commits, I have attempted to take that approach which reuses a maximum of the existing infrastructure by using Runnable to subscribe to the Publishers. In a second step, I've attempted to reintroduce some tracking of active subscriptions. That way, Publisher subscriptions that don't terminate before the originating bean is destroyed will have a chance to be cancelled (including infinite publishers).

This commit adds support for `@Scheduled` annotation on reactive methods
in fixed delay or fixed rate mode. Cron mode is not supported.

Reactive methods are methods that return a Publisher or a subclass of
Publisher. This is only considered if Reactor (and Reactive Streams)
is present at runtime.

This is implemented using Reactor operators, as a `Flux<Void>` that
repeatedly flatmaps to the `Publisher` in the background, re-subscribing
to it according to the `@Scheduled` configuration. The method which
creates the `Publisher` is only called once.

If the `Publisher` errors, the exception is logged at warn level and
otherwise ignored. As a result the underlying `Flux` will continue
re-subscribing to the `Publisher`.

Note that if Reactor is not present, the standard processing applies
and the method itself will repeatedly be scheduled for execution,
ignoring the returned `Publisher` and not even subscribing to it. This
effectively becomes a no-op operation unless the method has other side
effects.

Closes spring-projectsgh-23533
The support's `isReactive` method checks Kotlin suspending functions
first then reactive (Publisher-returning) methods second. It asserts
the relevant runtime, all in a single call.

Similarly, turning the Method into a Publisher is done via a single
common helper method `getPublisherFor(Method, Object)`.

All imports of reactive classes and other reactive-specific logic is
still outsourced to ScheduledAnnotationReactiveSupport in order to avoid
any classpath issue in the bean postprocessor.
@simonbasle
Copy link
Contributor Author

simonbasle commented May 5, 2023

I have now rebased on top of main to take the Antora doc changes into account, and have updated the PR body to make it a relevant commit message for the current state of the PR.

@simonbasle simonbasle force-pushed the 23533-scheduledForPublishers branch from 62f461d to 5b3ae76 Compare May 5, 2023 13:19
@simonbasle simonbasle changed the title Add support for Scheduled on reactive methods Support @Scheduled on Reactive methods and Kotlin suspending functions May 5, 2023
@bclozel
Copy link
Member

bclozel commented Jun 5, 2023

I'm currently working on the observability support of scheduled tasks in #29883. It looks like the current proposal for reactive methods support has a difference of behavior. Let's use two variants of scheduled methods, one blocking and the other reactive, both throwing exceptions during their processing:

@Component
public class ScheduledComponent {

    private static final Logger logger = LoggerFactory.getLogger(ScheduledComponent.class);

    @Scheduled(cron = "0,10,20,30,40,50 * * * * *")
    public void blocking() {
        logger.info("Executing 'blocking' @Scheduled method");
        throw new IllegalStateException("Blocking method failed");
    }

    @Scheduled(cron = "2,12,22,32,42,52 * * * * *")
    public Mono<Void> reactive() {
        return Mono.error(() -> new IllegalStateException("Reactive method failed"))
                .doOnError(exc -> logger.info("Executing 'reactive' @Scheduled method"))
                .then();
    }
}

The blocking variant throws the exception from the generated runnable, and it's caught by the org.springframework.util.ErrorHandler infrastructure (this can be customized on TaskScheduler implementations):

2023-06-05T10:44:30.004+02:00  INFO 17840 --- [   scheduling-1] c.example.scheduling.ScheduledComponent  : Executing 'blocking' @Scheduled method
2023-06-05T10:44:30.004+02:00 ERROR 17840 --- [   scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler    : Unexpected error occurred in scheduled task

java.lang.IllegalStateException: Blocking method failed
	at com.example.scheduling.ScheduledComponent.blocking(ScheduledComponent.java:17) ~[main/:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
	at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
	at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

With the current proposal for reactive support, exceptions are never thrown and the error handling contract is not used:

2023-06-05T10:44:32.019+02:00  INFO 17840 --- [   scheduling-1] c.example.scheduling.ScheduledComponent  : Executing 'reactive' @Scheduled method
2023-06-05T10:44:32.023+02:00  WARN 17840 --- [   scheduling-1] s.s.a.ScheduledAnnotationReactiveSupport : Unexpected error occurred in scheduled reactive task

java.lang.IllegalStateException: Reactive method failed
	at com.example.scheduling.ScheduledComponent.lambda$reactive$0(ScheduledComponent.java:22) ~[main/:na]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ @Scheduled 'reactive()' in bean 'com.example.scheduling.ScheduledComponent'
Original Stack Trace:
		at com.example.scheduling.ScheduledComponent.lambda$reactive$0(ScheduledComponent.java:22) ~[main/:na]
		at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55) ~[reactor-core-3.5.6.jar:3.5.6]
		at reactor.core.publisher.Flux.subscribe(Flux.java:8671) ~[reactor-core-3.5.6.jar:3.5.6]
		at org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport$SubscribingRunnable.run(ScheduledAnnotationReactiveSupport.java:194) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
		at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
		at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96) ~[spring-context-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
		at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
		at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
		at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
		at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

2023-06-05T10:44:32.027+02:00 ERROR 17840 --- [   scheduling-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

I don't see this as a limitation, but really as a key difference between imperative and async in general. This behavior is already documented in this PR, but maybe we can mention that the ErrorHandler will not be invoked. From an observability perspective, I think we can probably still record failures with the observations.

@bclozel bclozel self-assigned this Jun 5, 2023
@bclozel
Copy link
Member

bclozel commented Jun 5, 2023

Closed with 35052f2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: core Issues in core modules (aop, beans, core, context, expression) type: enhancement A general enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants