diff --git a/reactor-core/src/main/java/reactor/core/publisher/Hooks.java b/reactor-core/src/main/java/reactor/core/publisher/Hooks.java index 956b5ae075..10e711d300 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Hooks.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Hooks.java @@ -433,77 +433,6 @@ public static void resetOnOperatorError() { } } - /** - * Add or replace a named scheduling {@link Function decorator}. With subsequent calls - * to this method, the onSchedule hook can be a composite of several sub-hooks, each - * with a different key. - *

- * The sub-hook is a {@link Function} taking the scheduled {@link Runnable}. - * It returns the decorated {@link Runnable}. - * - * @param key the key under which to set up the onSchedule sub-hook - * @param decorator the {@link Runnable} decorator to add (or replace, if key is already present) - * @see #resetOnSchedule(String) - * @see #resetOnSchedule() - */ - public static void onSchedule(String key, Function decorator) { - synchronized (onScheduleHooks) { - onScheduleHooks.put(key, decorator); - Function newHook = null; - for (Function function : onScheduleHooks.values()) { - if (newHook == null) { - newHook = function; - } - else { - newHook = newHook.andThen(function); - } - } - onScheduleHook = newHook; - } - } - - /** - * Reset a specific onSchedule {@link Function sub-hook} if it has been set up - * via {@link #onSchedule(String, Function)}. - * - * @param key the key for onSchedule sub-hook to remove - * @see #onSchedule(String, Function) - * @see #resetOnSchedule() - */ - public static void resetOnSchedule(String key) { - synchronized (onScheduleHooks) { - onScheduleHooks.remove(key); - if (onScheduleHooks.isEmpty()) { - onScheduleHook = Function.identity(); - } - else { - Function newHook = null; - for (Function function : onScheduleHooks.values()) { - if (newHook == null) { - newHook = function; - } - else { - newHook = newHook.andThen(function); - } - } - onScheduleHook = newHook; - } - } - } - - /** - * Remove all onSchedule {@link Function sub-hooks}. - * - * @see #onSchedule(String, Function) - * @see #resetOnSchedule(String) - */ - public static void resetOnSchedule() { - synchronized (onScheduleHooks) { - onScheduleHooks.clear(); - onScheduleHook = Function.identity(); - } - } - /** * Reset global error dropped strategy to bubbling back the error. */ @@ -571,7 +500,6 @@ static Function createOrUpdateOpHook(Collection onEachOperatorHook; static volatile Function onLastOperatorHook; static volatile BiFunction onOperatorErrorHook; - private static Function onScheduleHook = Function.identity(); //Hooks that are just callbacks static volatile Consumer onErrorDroppedHook; @@ -586,7 +514,6 @@ static Function createOrUpdateOpHook(Collection, ? extends Publisher>> onEachOperatorHooks; private static final LinkedHashMap, ? extends Publisher>> onLastOperatorHooks; private static final LinkedHashMap> onOperatorErrorHooks; - private static final LinkedHashMap> onScheduleHooks = new LinkedHashMap<>(1); //Immutable views on hook trackers, for testing purpose static final Map, ? extends Publisher>> getOnEachOperatorHooks() { @@ -599,10 +526,6 @@ static Function createOrUpdateOpHook(Collection getOnScheduleHook() { - return onScheduleHook; - } - static final Logger log = Loggers.getLogger(Hooks.class); /** diff --git a/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java b/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java index 74498452ef..a28fd4081c 100644 --- a/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java +++ b/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java @@ -38,7 +38,6 @@ import reactor.core.Disposable; import reactor.core.Exceptions; import reactor.core.Scannable; -import reactor.core.publisher.Hooks; import reactor.util.Logger; import reactor.util.Loggers; import reactor.util.Metrics; @@ -460,7 +459,7 @@ public static void setFactory(Factory factoryInstance) { * for this key. * @see #setExecutorServiceDecorator(String, BiFunction) * @see #removeExecutorServiceDecorator(String) - * @see Hooks#onSchedule(String, Function) + * @see Schedulers#onScheduleHook(String, Function) */ public static boolean addExecutorServiceDecorator(String key, BiFunction decorator) { synchronized (DECORATORS) { @@ -482,7 +481,7 @@ public static boolean addExecutorServiceDecorator(String key, BiFunction decorator) { synchronized (DECORATORS) { @@ -562,6 +561,93 @@ else if (owner instanceof DelegateServiceScheduler) { return factory.decorateExecutorService(schedulerType, () -> beforeFactory); } + /** + * Add or replace a named scheduling {@link Function decorator}. With subsequent calls + * to this method, the onScheduleHook hook can be a composite of several sub-hooks, each + * with a different key. + *

+ * The sub-hook is a {@link Function} taking the scheduled {@link Runnable}. + * It returns the decorated {@link Runnable}. + * + * @param key the key under which to set up the onScheduleHook sub-hook + * @param decorator the {@link Runnable} decorator to add (or replace, if key is already present) + * @see #resetOnScheduleHook(String) + * @see #resetOnScheduleHooks() + */ + public static void onScheduleHook(String key, Function decorator) { + synchronized (onScheduleHooks) { + onScheduleHooks.put(key, decorator); + Function newHook = null; + for (Function function : onScheduleHooks.values()) { + if (newHook == null) { + newHook = function; + } + else { + newHook = newHook.andThen(function); + } + } + onScheduleHook = newHook; + } + } + + /** + * Reset a specific onScheduleHook {@link Function sub-hook} if it has been set up + * via {@link #onScheduleHook(String, Function)}. + * + * @param key the key for onScheduleHook sub-hook to remove + * @see #onScheduleHook(String, Function) + * @see #resetOnScheduleHooks() + */ + public static void resetOnScheduleHook(String key) { + synchronized (onScheduleHooks) { + onScheduleHooks.remove(key); + if (onScheduleHooks.isEmpty()) { + onScheduleHook = Function.identity(); + } + else { + Function newHook = null; + for (Function function : onScheduleHooks.values()) { + if (newHook == null) { + newHook = function; + } + else { + newHook = newHook.andThen(function); + } + } + onScheduleHook = newHook; + } + } + } + + /** + * Remove all onScheduleHook {@link Function sub-hooks}. + * + * @see #onScheduleHook(String, Function) + * @see #resetOnScheduleHook(String) + */ + public static void resetOnScheduleHooks() { + synchronized (onScheduleHooks) { + onScheduleHooks.clear(); + onScheduleHook = null; + } + } + + /** + * Applies the hooks registered with {@link Schedulers#onScheduleHook(String, Function)}. + * + * @param runnable a {@link Runnable} submitted to a {@link Scheduler} + * @return decorated {@link Runnable} if any hook is registered, the original otherwise. + */ + public static Runnable onSchedule(Runnable runnable) { + Function hook = onScheduleHook; + if (hook != null) { + return hook.apply(runnable); + } + else { + return runnable; + } + } + /** * Clear any cached {@link Scheduler} and call dispose on them. */ @@ -702,6 +788,11 @@ default Scheduler newSingle(ThreadFactory threadFactory) { static volatile Factory factory = DEFAULT; + private static final LinkedHashMap> onScheduleHooks = new LinkedHashMap<>(1); + + @Nullable + private static Function onScheduleHook; + /** * Get a {@link CachedScheduler} out of the {@code reference} or create one using the * {@link Supplier} if the reference is empty, effectively creating a single instance @@ -833,7 +924,7 @@ static Disposable directSchedule(ScheduledExecutorService exec, Runnable task, long delay, TimeUnit unit) { - task = Hooks.getOnScheduleHook().apply(task); + task = onSchedule(task); SchedulerTask sr = new SchedulerTask(task); Future f; if (delay <= 0L) { @@ -852,7 +943,7 @@ static Disposable directSchedulePeriodically(ScheduledExecutorService exec, long initialDelay, long period, TimeUnit unit) { - task = Hooks.getOnScheduleHook().apply(task); + task = onSchedule(task); if (period <= 0L) { InstantPeriodicWorkerTask isr = @@ -882,7 +973,7 @@ static Disposable workerSchedule(ScheduledExecutorService exec, Runnable task, long delay, TimeUnit unit) { - task = Hooks.getOnScheduleHook().apply(task); + task = onSchedule(task); WorkerTask sr = new WorkerTask(task, tasks); if (!tasks.add(sr)) { @@ -914,7 +1005,7 @@ static Disposable workerSchedulePeriodically(ScheduledExecutorService exec, long initialDelay, long period, TimeUnit unit) { - task = Hooks.getOnScheduleHook().apply(task); + task = onSchedule(task); if (period <= 0L) { InstantPeriodicWorkerTask isr = diff --git a/reactor-core/src/test/java/reactor/core/publisher/HooksTest.java b/reactor-core/src/test/java/reactor/core/publisher/HooksTest.java index b9aeae9928..cc3e7cb812 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/HooksTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/HooksTest.java @@ -21,10 +21,7 @@ import java.util.Collections; import java.util.List; import java.util.Queue; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedTransferQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -61,7 +58,6 @@ public void resetAllHooks() { Hooks.resetOnOperatorDebug(); Hooks.resetOnEachOperator(); Hooks.resetOnLastOperator(); - Hooks.resetOnSchedule(); } void simpleFlux(){ @@ -945,138 +941,4 @@ public void onNextDroppedFailReplaces() { Hooks.resetOnNextDropped(); } } - - @Test - public void onScheduleIsAdditive() throws Exception { - AtomicInteger tracker = new AtomicInteger(); - Hooks.onSchedule("k1", new TrackingDecorator(tracker, 1)); - Hooks.onSchedule("k2", new TrackingDecorator(tracker, 10)); - Hooks.onSchedule("k3", new TrackingDecorator(tracker, 100)); - - CountDownLatch latch = new CountDownLatch(3); - Schedulers.newElastic("foo").schedule(latch::countDown); - latch.await(5, TimeUnit.SECONDS); - - assertThat(tracker).as("3 decorators invoked").hasValue(111); - } - - @Test - public void onScheduleReplaces() throws Exception { - AtomicInteger tracker = new AtomicInteger(); - Hooks.onSchedule("k1", new TrackingDecorator(tracker, 1)); - Hooks.onSchedule("k1", new TrackingDecorator(tracker, 10)); - Hooks.onSchedule("k1", new TrackingDecorator(tracker, 100)); - - CountDownLatch latch = new CountDownLatch(1); - Schedulers.newElastic("foo").schedule(latch::countDown); - latch.await(5, TimeUnit.SECONDS); - - assertThat(tracker).hasValue(100); - } - - @Test - public void onScheduleWorksWhenEmpty() throws Exception { - AtomicInteger tracker = new AtomicInteger(); - Hooks.onSchedule("k1", new TrackingDecorator(tracker, 1)); - Hooks.resetOnSchedule("k1"); - - CountDownLatch latch = new CountDownLatch(1); - Schedulers.newElastic("foo").schedule(latch::countDown); - latch.await(5, TimeUnit.SECONDS); - - assertThat(tracker).hasValue(0); - } - - @Test - public void onScheduleIgnoresUnknownRemovals() { - assertThatCode(() -> Hooks.resetOnSchedule("k1")) - .doesNotThrowAnyException(); - } - - @Test - public void onScheduleResetOne() throws InterruptedException { - AtomicInteger tracker = new AtomicInteger(); - Hooks.onSchedule("k1", new TrackingDecorator(tracker, 1)); - Hooks.onSchedule("k2", new TrackingDecorator(tracker, 10)); - Hooks.onSchedule("k3", new TrackingDecorator(tracker, 100)); - Hooks.resetOnSchedule("k2"); - - CountDownLatch latch = new CountDownLatch(3); - Schedulers.newElastic("foo").schedule(latch::countDown); - latch.await(5, TimeUnit.SECONDS); - - assertThat(tracker).hasValue(101); - } - - @Test - public void onScheduleResetAll() throws InterruptedException { - AtomicInteger tracker = new AtomicInteger(); - Hooks.onSchedule("k1", new TrackingDecorator(tracker, 1)); - Hooks.onSchedule("k2", new TrackingDecorator(tracker, 10)); - Hooks.onSchedule("k3", new TrackingDecorator(tracker, 100)); - Hooks.resetOnSchedule(); - - CountDownLatch latch = new CountDownLatch(1); - Schedulers.newElastic("foo").schedule(latch::countDown); - latch.await(5, TimeUnit.SECONDS); - - assertThat(tracker).hasValue(0); - } - - @Test - public void onSchedulesAreOrdered() throws Exception { - CopyOnWriteArrayList items = new CopyOnWriteArrayList<>(); - Hooks.onSchedule("k1", new ApplicationOrderRecordingDecorator(items, "k1")); - Hooks.onSchedule("k2", new ApplicationOrderRecordingDecorator(items, "k2")); - Hooks.onSchedule("k3", new ApplicationOrderRecordingDecorator(items, "k3")); - - CountDownLatch latch = new CountDownLatch(1); - Schedulers.newElastic("foo").schedule(latch::countDown); - latch.await(5, TimeUnit.SECONDS); - - assertThat(items).containsExactly( - "k1#0", - "k2#0", - "k3#0" - ); - } - - private static class TrackingDecorator implements Function { - final AtomicInteger tracker; - final int dx; - - private TrackingDecorator(AtomicInteger tracker, int dx) { - this.tracker = tracker; - this.dx = dx; - } - - @Override - public Runnable apply(Runnable runnable) { - return () -> { - tracker.addAndGet(dx); - runnable.run(); - }; - } - } - - private static class ApplicationOrderRecordingDecorator - implements Function { - - final List items; - - final String id; - - final AtomicInteger counter = new AtomicInteger(); - - public ApplicationOrderRecordingDecorator(List items, String id) { - this.items = items; - this.id = id; - } - - @Override - public Runnable apply(Runnable runnable) { - items.add(id + "#" + counter.getAndIncrement()); - return runnable; - } - } } diff --git a/reactor-core/src/test/java/reactor/core/scheduler/SchedulersHooksTest.java b/reactor-core/src/test/java/reactor/core/scheduler/SchedulersHooksTest.java new file mode 100644 index 0000000000..71f8a51422 --- /dev/null +++ b/reactor-core/src/test/java/reactor/core/scheduler/SchedulersHooksTest.java @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2019-Present Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.scheduler; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import org.junit.After; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +public class SchedulersHooksTest { + + @After + public void resetAllHooks() { + Schedulers.resetOnScheduleHooks(); + } + + @Test + public void onScheduleIsAdditive() throws Exception { + AtomicInteger tracker = new AtomicInteger(); + Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 1)); + Schedulers.onScheduleHook("k2", new TrackingDecorator(tracker, 10)); + Schedulers.onScheduleHook("k3", new TrackingDecorator(tracker, 100)); + + CountDownLatch latch = new CountDownLatch(3); + Schedulers.newElastic("foo").schedule(latch::countDown); + latch.await(5, TimeUnit.SECONDS); + + assertThat(tracker).as("3 decorators invoked").hasValue(111); + } + + @Test + public void onScheduleReplaces() throws Exception { + AtomicInteger tracker = new AtomicInteger(); + Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 1)); + Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 10)); + Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 100)); + + CountDownLatch latch = new CountDownLatch(1); + Schedulers.newElastic("foo").schedule(latch::countDown); + latch.await(5, TimeUnit.SECONDS); + + assertThat(tracker).hasValue(100); + } + + @Test + public void onScheduleWorksWhenEmpty() throws Exception { + AtomicInteger tracker = new AtomicInteger(); + Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 1)); + Schedulers.resetOnScheduleHook("k1"); + + CountDownLatch latch = new CountDownLatch(1); + Schedulers.newElastic("foo").schedule(latch::countDown); + latch.await(5, TimeUnit.SECONDS); + + assertThat(tracker).hasValue(0); + } + + @Test + public void onScheduleIgnoresUnknownRemovals() { + assertThatCode(() -> Schedulers.resetOnScheduleHook("k1")) + .doesNotThrowAnyException(); + } + + @Test + public void onScheduleResetOne() throws InterruptedException { + AtomicInteger tracker = new AtomicInteger(); + Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 1)); + Schedulers.onScheduleHook("k2", new TrackingDecorator(tracker, 10)); + Schedulers.onScheduleHook("k3", new TrackingDecorator(tracker, 100)); + Schedulers.resetOnScheduleHook("k2"); + + CountDownLatch latch = new CountDownLatch(3); + Schedulers.newElastic("foo").schedule(latch::countDown); + latch.await(5, TimeUnit.SECONDS); + + assertThat(tracker).hasValue(101); + } + + @Test + public void onScheduleResetAll() throws InterruptedException { + AtomicInteger tracker = new AtomicInteger(); + Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 1)); + Schedulers.onScheduleHook("k2", new TrackingDecorator(tracker, 10)); + Schedulers.onScheduleHook("k3", new TrackingDecorator(tracker, 100)); + Schedulers.resetOnScheduleHooks(); + + CountDownLatch latch = new CountDownLatch(1); + Schedulers.newElastic("foo").schedule(latch::countDown); + latch.await(5, TimeUnit.SECONDS); + + assertThat(tracker).hasValue(0); + } + + @Test + public void onSchedulesAreOrdered() throws Exception { + CopyOnWriteArrayList items = new CopyOnWriteArrayList<>(); + Schedulers.onScheduleHook("k1", new ApplicationOrderRecordingDecorator(items, "k1")); + Schedulers.onScheduleHook("k2", new ApplicationOrderRecordingDecorator(items, "k2")); + Schedulers.onScheduleHook("k3", new ApplicationOrderRecordingDecorator(items, "k3")); + + CountDownLatch latch = new CountDownLatch(1); + Schedulers.newElastic("foo").schedule(latch::countDown); + latch.await(5, TimeUnit.SECONDS); + + assertThat(items).containsExactly( + "k1#0", + "k2#0", + "k3#0" + ); + } + + private static class TrackingDecorator implements Function { + final AtomicInteger tracker; + final int dx; + + private TrackingDecorator(AtomicInteger tracker, int dx) { + this.tracker = tracker; + this.dx = dx; + } + + @Override + public Runnable apply(Runnable runnable) { + return () -> { + tracker.addAndGet(dx); + runnable.run(); + }; + } + } + + private static class ApplicationOrderRecordingDecorator + implements Function { + + final List items; + + final String id; + + final AtomicInteger counter = new AtomicInteger(); + + public ApplicationOrderRecordingDecorator(List items, String id) { + this.items = items; + this.id = id; + } + + @Override + public Runnable apply(Runnable runnable) { + items.add(id + "#" + counter.getAndIncrement()); + return runnable; + } + } +}