Skip to content

Commit

Permalink
Move onScheduleHook to Schedulers
Browse files Browse the repository at this point in the history
  • Loading branch information
bsideup authored and simonbasle committed Mar 27, 2019
1 parent b30fcb2 commit 9a4cb12
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 222 deletions.
77 changes: 0 additions & 77 deletions reactor-core/src/main/java/reactor/core/publisher/Hooks.java
Expand Up @@ -433,77 +433,6 @@ public static void resetOnOperatorError() {
}
}

/**
* Add or replace a named scheduling {@link Function decorator}. With subsequent calls
* to this method, the onSchedule hook can be a composite of several sub-hooks, each
* with a different key.
* <p>
* The sub-hook is a {@link Function} taking the scheduled {@link Runnable}.
* It returns the decorated {@link Runnable}.
*
* @param key the key under which to set up the onSchedule sub-hook
* @param decorator the {@link Runnable} decorator to add (or replace, if key is already present)
* @see #resetOnSchedule(String)
* @see #resetOnSchedule()
*/
public static void onSchedule(String key, Function<Runnable, Runnable> decorator) {
synchronized (onScheduleHooks) {
onScheduleHooks.put(key, decorator);
Function<Runnable, Runnable> newHook = null;
for (Function<Runnable, Runnable> function : onScheduleHooks.values()) {
if (newHook == null) {
newHook = function;
}
else {
newHook = newHook.andThen(function);
}
}
onScheduleHook = newHook;
}
}

/**
* Reset a specific onSchedule {@link Function sub-hook} if it has been set up
* via {@link #onSchedule(String, Function)}.
*
* @param key the key for onSchedule sub-hook to remove
* @see #onSchedule(String, Function)
* @see #resetOnSchedule()
*/
public static void resetOnSchedule(String key) {
synchronized (onScheduleHooks) {
onScheduleHooks.remove(key);
if (onScheduleHooks.isEmpty()) {
onScheduleHook = Function.identity();
}
else {
Function<Runnable, Runnable> newHook = null;
for (Function<Runnable, Runnable> function : onScheduleHooks.values()) {
if (newHook == null) {
newHook = function;
}
else {
newHook = newHook.andThen(function);
}
}
onScheduleHook = newHook;
}
}
}

/**
* Remove all onSchedule {@link Function sub-hooks}.
*
* @see #onSchedule(String, Function)
* @see #resetOnSchedule(String)
*/
public static void resetOnSchedule() {
synchronized (onScheduleHooks) {
onScheduleHooks.clear();
onScheduleHook = Function.identity();
}
}

/**
* Reset global error dropped strategy to bubbling back the error.
*/
Expand Down Expand Up @@ -571,7 +500,6 @@ static Function<Publisher, Publisher> createOrUpdateOpHook(Collection<Function<?
static volatile Function<Publisher, Publisher> onEachOperatorHook;
static volatile Function<Publisher, Publisher> onLastOperatorHook;
static volatile BiFunction<? super Throwable, Object, ? extends Throwable> onOperatorErrorHook;
private static Function<Runnable, Runnable> onScheduleHook = Function.identity();

//Hooks that are just callbacks
static volatile Consumer<? super Throwable> onErrorDroppedHook;
Expand All @@ -586,7 +514,6 @@ static Function<Publisher, Publisher> createOrUpdateOpHook(Collection<Function<?
private static final LinkedHashMap<String, Function<? super Publisher<Object>, ? extends Publisher<Object>>> onEachOperatorHooks;
private static final LinkedHashMap<String, Function<? super Publisher<Object>, ? extends Publisher<Object>>> onLastOperatorHooks;
private static final LinkedHashMap<String, BiFunction<? super Throwable, Object, ? extends Throwable>> onOperatorErrorHooks;
private static final LinkedHashMap<String, Function<Runnable, Runnable>> onScheduleHooks = new LinkedHashMap<>(1);

//Immutable views on hook trackers, for testing purpose
static final Map<String, Function<? super Publisher<Object>, ? extends Publisher<Object>>> getOnEachOperatorHooks() {
Expand All @@ -599,10 +526,6 @@ static Function<Publisher, Publisher> createOrUpdateOpHook(Collection<Function<?
return Collections.unmodifiableMap(onOperatorErrorHooks);
}

public static Function<Runnable, Runnable> getOnScheduleHook() {
return onScheduleHook;
}

static final Logger log = Loggers.getLogger(Hooks.class);

/**
Expand Down
105 changes: 98 additions & 7 deletions reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java
Expand Up @@ -38,7 +38,6 @@
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.Hooks;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.Metrics;
Expand Down Expand Up @@ -460,7 +459,7 @@ public static void setFactory(Factory factoryInstance) {
* for this key.
* @see #setExecutorServiceDecorator(String, BiFunction)
* @see #removeExecutorServiceDecorator(String)
* @see Hooks#onSchedule(String, Function)
* @see Schedulers#onScheduleHook(String, Function)
*/
public static boolean addExecutorServiceDecorator(String key, BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService> decorator) {
synchronized (DECORATORS) {
Expand All @@ -482,7 +481,7 @@ public static boolean addExecutorServiceDecorator(String key, BiFunction<Schedul
* @param decorator the executor service decorator to add, if key not already present.
* @see #addExecutorServiceDecorator(String, BiFunction)
* @see #removeExecutorServiceDecorator(String)
* @see Hooks#onSchedule(String, Function)
* @see Schedulers#onScheduleHook(String, Function)
*/
public static void setExecutorServiceDecorator(String key, BiFunction<Scheduler, ScheduledExecutorService, ScheduledExecutorService> decorator) {
synchronized (DECORATORS) {
Expand Down Expand Up @@ -562,6 +561,93 @@ else if (owner instanceof DelegateServiceScheduler) {
return factory.decorateExecutorService(schedulerType, () -> beforeFactory);
}

/**
* Add or replace a named scheduling {@link Function decorator}. With subsequent calls
* to this method, the onScheduleHook hook can be a composite of several sub-hooks, each
* with a different key.
* <p>
* The sub-hook is a {@link Function} taking the scheduled {@link Runnable}.
* It returns the decorated {@link Runnable}.
*
* @param key the key under which to set up the onScheduleHook sub-hook
* @param decorator the {@link Runnable} decorator to add (or replace, if key is already present)
* @see #resetOnScheduleHook(String)
* @see #resetOnScheduleHooks()
*/
public static void onScheduleHook(String key, Function<Runnable, Runnable> decorator) {
synchronized (onScheduleHooks) {
onScheduleHooks.put(key, decorator);
Function<Runnable, Runnable> newHook = null;
for (Function<Runnable, Runnable> function : onScheduleHooks.values()) {
if (newHook == null) {
newHook = function;
}
else {
newHook = newHook.andThen(function);
}
}
onScheduleHook = newHook;
}
}

/**
* Reset a specific onScheduleHook {@link Function sub-hook} if it has been set up
* via {@link #onScheduleHook(String, Function)}.
*
* @param key the key for onScheduleHook sub-hook to remove
* @see #onScheduleHook(String, Function)
* @see #resetOnScheduleHooks()
*/
public static void resetOnScheduleHook(String key) {
synchronized (onScheduleHooks) {
onScheduleHooks.remove(key);
if (onScheduleHooks.isEmpty()) {
onScheduleHook = Function.identity();
}
else {
Function<Runnable, Runnable> newHook = null;
for (Function<Runnable, Runnable> function : onScheduleHooks.values()) {
if (newHook == null) {
newHook = function;
}
else {
newHook = newHook.andThen(function);
}
}
onScheduleHook = newHook;
}
}
}

/**
* Remove all onScheduleHook {@link Function sub-hooks}.
*
* @see #onScheduleHook(String, Function)
* @see #resetOnScheduleHook(String)
*/
public static void resetOnScheduleHooks() {
synchronized (onScheduleHooks) {
onScheduleHooks.clear();
onScheduleHook = null;
}
}

/**
* Applies the hooks registered with {@link Schedulers#onScheduleHook(String, Function)}.
*
* @param runnable a {@link Runnable} submitted to a {@link Scheduler}
* @return decorated {@link Runnable} if any hook is registered, the original otherwise.
*/
public static Runnable onSchedule(Runnable runnable) {
Function<Runnable, Runnable> hook = onScheduleHook;
if (hook != null) {
return hook.apply(runnable);
}
else {
return runnable;
}
}

/**
* Clear any cached {@link Scheduler} and call dispose on them.
*/
Expand Down Expand Up @@ -702,6 +788,11 @@ default Scheduler newSingle(ThreadFactory threadFactory) {

static volatile Factory factory = DEFAULT;

private static final LinkedHashMap<String, Function<Runnable, Runnable>> onScheduleHooks = new LinkedHashMap<>(1);

@Nullable
private static Function<Runnable, Runnable> onScheduleHook;

/**
* Get a {@link CachedScheduler} out of the {@code reference} or create one using the
* {@link Supplier} if the reference is empty, effectively creating a single instance
Expand Down Expand Up @@ -833,7 +924,7 @@ static Disposable directSchedule(ScheduledExecutorService exec,
Runnable task,
long delay,
TimeUnit unit) {
task = Hooks.getOnScheduleHook().apply(task);
task = onSchedule(task);
SchedulerTask sr = new SchedulerTask(task);
Future<?> f;
if (delay <= 0L) {
Expand All @@ -852,7 +943,7 @@ static Disposable directSchedulePeriodically(ScheduledExecutorService exec,
long initialDelay,
long period,
TimeUnit unit) {
task = Hooks.getOnScheduleHook().apply(task);
task = onSchedule(task);

if (period <= 0L) {
InstantPeriodicWorkerTask isr =
Expand Down Expand Up @@ -882,7 +973,7 @@ static Disposable workerSchedule(ScheduledExecutorService exec,
Runnable task,
long delay,
TimeUnit unit) {
task = Hooks.getOnScheduleHook().apply(task);
task = onSchedule(task);

WorkerTask sr = new WorkerTask(task, tasks);
if (!tasks.add(sr)) {
Expand Down Expand Up @@ -914,7 +1005,7 @@ static Disposable workerSchedulePeriodically(ScheduledExecutorService exec,
long initialDelay,
long period,
TimeUnit unit) {
task = Hooks.getOnScheduleHook().apply(task);
task = onSchedule(task);

if (period <= 0L) {
InstantPeriodicWorkerTask isr =
Expand Down

0 comments on commit 9a4cb12

Please sign in to comment.