New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add simplified API for wrapping scheduled tasks. #1546
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1546 +/- ##
============================================
+ Coverage 84.2% 84.33% +0.13%
- Complexity 3907 3918 +11
============================================
Files 359 359
Lines 29920 29958 +38
Branches 5566 5574 +8
============================================
+ Hits 25194 25265 +71
+ Misses 3096 3075 -21
+ Partials 1630 1618 -12
Continue to review full report at Codecov.
|
reactor-core/src/main/java/reactor/core/publisher/HookableScheduledExecutorService.java
Outdated
Show resolved
Hide resolved
public <V> ScheduledFuture<V> schedule(Callable<V> callable, | ||
long delay, | ||
TimeUnit unit) { | ||
return (ScheduledFuture<V>) delegate.schedule(wrap(callable), delay, unit); //FIXME |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bsideup @smaldini I can see why the new interface was proposed to map both Runnable
and Callable
. Maybe instead of hard casting we should throw UOE, saying that reactor internals are not supposed to call ScheduledExecutorService
Callable-based methods?
And probably hunt down every place where we have access to an executor to introduce a comment that such executors' Callable-based methods are off limits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we leak the ScheduledExecutorService
to our public API, these UOE make me feel uneasy to be honest. Did you come to some agreement with @smaldini about this?
|
||
@Override | ||
public <T> Future<T> submit(Callable<T> task) { | ||
return (Future<T>) delegate.submit(wrap(task)); //FIXME |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here as FIXME above
* @see #resetOnSchedule(String) | ||
* @see #resetOnSchedule() | ||
*/ | ||
public static void onSchedule(String key, Function<Runnable, Runnable> decorator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bsideup I aligned the naming of the method and its behavior with other hooks. Main change is that it is now "add or replace" (put
). Unless there is a compelling reason for this particular hook to be putIfAbsent
unlike the others?
* This key is used to add the {@link #onSchedule(String, Function)} composite hook as | ||
* a {@link Schedulers#addExecutorServiceDecorator(String, BiFunction) executorService decorator}. | ||
*/ | ||
private static final String EXECUTOR_DECORATOR_ONSCHEDULE = "reactor.onSchedule"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bsideup I changed the constant name a bit and made it private because other constants at this level are used for storing a hook override in the Context
(hence the .local
suffix). This is different though: a key for the Schedulers
own "hook" (the executor decorator).
@@ -70,7 +70,7 @@ public Void call() { | |||
try { | |||
try { | |||
task.run(); | |||
setRest(executor.submit(this)); | |||
setRest(executor.submit((Callable<Void>) this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this cast to Runnable
instead? (see discussion in the HookableScheduledExecutorService
)
Runnable
is the only type that is guaranteed to be maintained by the Function<Runnable, Runnable>
... Callable
is used internally mostly for short-circuiting subscription IIRC, but I don't think they've been actively used for tasks scheduling purposes so far.
Having this new hook kind of enforces us to limit our use of ExecutorService
to Runnable
-based methods... cc @smaldini
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it to align with the rest of tasks. AFAIR the problem is that ScheduledThreadPoolExecutor
wraps Runnable into Callable when submitted:
https://github.com/frohoff/jdk8u-jdk/blob/da0da73ab82ed714dc5be94acd2f0d00fbdfe2e9/src/share/classes/java/util/concurrent/FutureTask.java#L152
reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java
Outdated
Show resolved
Hide resolved
7c34464
to
e2c9985
Compare
`Hooks.addOnScheduleDecorator(String, ScheduledTaskDecorator)` is a new API for wrapping the tasks submitted to Reactor's schedulers. It's implemented as a "sugar" for `addExecutorServiceDecorator`, but instead of registering a decorator per hook, we register a single decorator and iterate over the registered `ScheduledTaskDecorator`s.
- NotNull (from jetbrains) is not necessary. superseded by NonNullApi (from reactor) at package level anyway. - removed unused throws clauses for methods that throw UOE anyway - Reworked the hook naming and javadoc to reflect what we have with other transformative keyed hooks (onSchedule, resetOnSchedule(String) and resetOnSchedule()). onSchedule adding is now replacing decorators if a key exists.
46f7a7a
to
9a4cb12
Compare
`Schedulers.onScheduleHook(String, Function<Runnable,Runnable>)` is a new API for wrapping the tasks submitted to Reactor's schedulers. The hooks intercepts `Runnable`s before they passed to the executor.
Schedulers.onScheduleHook(String, Function<Runnable,Runnable>)
is a newAPI for wrapping the tasks submitted to Reactor's schedulers.