From 98bda90b359393d4dab43c4d0bc407aac816965b Mon Sep 17 00:00:00 2001 From: Sergei Egorov Date: Thu, 28 Mar 2019 21:08:26 +0100 Subject: [PATCH] Add a simplified API for wrapping scheduled tasks. (#1546) `Schedulers.onScheduleHook(String, Function)` is a new API for wrapping the tasks submitted to Reactor's schedulers. The hooks intercepts `Runnable`s before they passed to the executor. --- .../java/reactor/core/publisher/Hooks.java | 1 - .../reactor/core/scheduler/Schedulers.java | 99 ++++++++++ .../reactor/core/publisher/HooksTest.java | 1 - .../core/scheduler/SchedulersHooksTest.java | 172 ++++++++++++++++++ 4 files changed, 271 insertions(+), 2 deletions(-) create mode 100644 reactor-core/src/test/java/reactor/core/scheduler/SchedulersHooksTest.java diff --git a/reactor-core/src/main/java/reactor/core/publisher/Hooks.java b/reactor-core/src/main/java/reactor/core/publisher/Hooks.java index 6de51d399d..10e711d300 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Hooks.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Hooks.java @@ -588,5 +588,4 @@ static > Publisher addAssemblyInfo(P publisher, Ass } return new FluxOnAssembly<>((Flux) publisher, stacktrace); } - } diff --git a/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java b/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java index 7efb57f04a..a28fd4081c 100644 --- a/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java +++ b/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.Supplier; import reactor.core.Disposable; @@ -458,6 +459,7 @@ public static void setFactory(Factory factoryInstance) { * for this key. * @see #setExecutorServiceDecorator(String, BiFunction) * @see #removeExecutorServiceDecorator(String) + * @see Schedulers#onScheduleHook(String, Function) */ public static boolean addExecutorServiceDecorator(String key, BiFunction decorator) { synchronized (DECORATORS) { @@ -479,6 +481,7 @@ public static boolean addExecutorServiceDecorator(String key, BiFunction decorator) { synchronized (DECORATORS) { @@ -558,6 +561,93 @@ else if (owner instanceof DelegateServiceScheduler) { return factory.decorateExecutorService(schedulerType, () -> beforeFactory); } + /** + * Add or replace a named scheduling {@link Function decorator}. With subsequent calls + * to this method, the onScheduleHook hook can be a composite of several sub-hooks, each + * with a different key. + *

+ * The sub-hook is a {@link Function} taking the scheduled {@link Runnable}. + * It returns the decorated {@link Runnable}. + * + * @param key the key under which to set up the onScheduleHook sub-hook + * @param decorator the {@link Runnable} decorator to add (or replace, if key is already present) + * @see #resetOnScheduleHook(String) + * @see #resetOnScheduleHooks() + */ + public static void onScheduleHook(String key, Function decorator) { + synchronized (onScheduleHooks) { + onScheduleHooks.put(key, decorator); + Function newHook = null; + for (Function function : onScheduleHooks.values()) { + if (newHook == null) { + newHook = function; + } + else { + newHook = newHook.andThen(function); + } + } + onScheduleHook = newHook; + } + } + + /** + * Reset a specific onScheduleHook {@link Function sub-hook} if it has been set up + * via {@link #onScheduleHook(String, Function)}. + * + * @param key the key for onScheduleHook sub-hook to remove + * @see #onScheduleHook(String, Function) + * @see #resetOnScheduleHooks() + */ + public static void resetOnScheduleHook(String key) { + synchronized (onScheduleHooks) { + onScheduleHooks.remove(key); + if (onScheduleHooks.isEmpty()) { + onScheduleHook = Function.identity(); + } + else { + Function newHook = null; + for (Function function : onScheduleHooks.values()) { + if (newHook == null) { + newHook = function; + } + else { + newHook = newHook.andThen(function); + } + } + onScheduleHook = newHook; + } + } + } + + /** + * Remove all onScheduleHook {@link Function sub-hooks}. + * + * @see #onScheduleHook(String, Function) + * @see #resetOnScheduleHook(String) + */ + public static void resetOnScheduleHooks() { + synchronized (onScheduleHooks) { + onScheduleHooks.clear(); + onScheduleHook = null; + } + } + + /** + * Applies the hooks registered with {@link Schedulers#onScheduleHook(String, Function)}. + * + * @param runnable a {@link Runnable} submitted to a {@link Scheduler} + * @return decorated {@link Runnable} if any hook is registered, the original otherwise. + */ + public static Runnable onSchedule(Runnable runnable) { + Function hook = onScheduleHook; + if (hook != null) { + return hook.apply(runnable); + } + else { + return runnable; + } + } + /** * Clear any cached {@link Scheduler} and call dispose on them. */ @@ -698,6 +788,11 @@ default Scheduler newSingle(ThreadFactory threadFactory) { static volatile Factory factory = DEFAULT; + private static final LinkedHashMap> onScheduleHooks = new LinkedHashMap<>(1); + + @Nullable + private static Function onScheduleHook; + /** * Get a {@link CachedScheduler} out of the {@code reference} or create one using the * {@link Supplier} if the reference is empty, effectively creating a single instance @@ -829,6 +924,7 @@ static Disposable directSchedule(ScheduledExecutorService exec, Runnable task, long delay, TimeUnit unit) { + task = onSchedule(task); SchedulerTask sr = new SchedulerTask(task); Future f; if (delay <= 0L) { @@ -847,6 +943,7 @@ static Disposable directSchedulePeriodically(ScheduledExecutorService exec, long initialDelay, long period, TimeUnit unit) { + task = onSchedule(task); if (period <= 0L) { InstantPeriodicWorkerTask isr = @@ -876,6 +973,7 @@ static Disposable workerSchedule(ScheduledExecutorService exec, Runnable task, long delay, TimeUnit unit) { + task = onSchedule(task); WorkerTask sr = new WorkerTask(task, tasks); if (!tasks.add(sr)) { @@ -907,6 +1005,7 @@ static Disposable workerSchedulePeriodically(ScheduledExecutorService exec, long initialDelay, long period, TimeUnit unit) { + task = onSchedule(task); if (period <= 0L) { InstantPeriodicWorkerTask isr = diff --git a/reactor-core/src/test/java/reactor/core/publisher/HooksTest.java b/reactor-core/src/test/java/reactor/core/publisher/HooksTest.java index 6d752f86b7..cc3e7cb812 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/HooksTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/HooksTest.java @@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; -import java.util.function.Consumer; import java.util.function.Function; import java.util.logging.Level; diff --git a/reactor-core/src/test/java/reactor/core/scheduler/SchedulersHooksTest.java b/reactor-core/src/test/java/reactor/core/scheduler/SchedulersHooksTest.java new file mode 100644 index 0000000000..71f8a51422 --- /dev/null +++ b/reactor-core/src/test/java/reactor/core/scheduler/SchedulersHooksTest.java @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2019-Present Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.scheduler; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import org.junit.After; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +public class SchedulersHooksTest { + + @After + public void resetAllHooks() { + Schedulers.resetOnScheduleHooks(); + } + + @Test + public void onScheduleIsAdditive() throws Exception { + AtomicInteger tracker = new AtomicInteger(); + Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 1)); + Schedulers.onScheduleHook("k2", new TrackingDecorator(tracker, 10)); + Schedulers.onScheduleHook("k3", new TrackingDecorator(tracker, 100)); + + CountDownLatch latch = new CountDownLatch(3); + Schedulers.newElastic("foo").schedule(latch::countDown); + latch.await(5, TimeUnit.SECONDS); + + assertThat(tracker).as("3 decorators invoked").hasValue(111); + } + + @Test + public void onScheduleReplaces() throws Exception { + AtomicInteger tracker = new AtomicInteger(); + Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 1)); + Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 10)); + Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 100)); + + CountDownLatch latch = new CountDownLatch(1); + Schedulers.newElastic("foo").schedule(latch::countDown); + latch.await(5, TimeUnit.SECONDS); + + assertThat(tracker).hasValue(100); + } + + @Test + public void onScheduleWorksWhenEmpty() throws Exception { + AtomicInteger tracker = new AtomicInteger(); + Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 1)); + Schedulers.resetOnScheduleHook("k1"); + + CountDownLatch latch = new CountDownLatch(1); + Schedulers.newElastic("foo").schedule(latch::countDown); + latch.await(5, TimeUnit.SECONDS); + + assertThat(tracker).hasValue(0); + } + + @Test + public void onScheduleIgnoresUnknownRemovals() { + assertThatCode(() -> Schedulers.resetOnScheduleHook("k1")) + .doesNotThrowAnyException(); + } + + @Test + public void onScheduleResetOne() throws InterruptedException { + AtomicInteger tracker = new AtomicInteger(); + Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 1)); + Schedulers.onScheduleHook("k2", new TrackingDecorator(tracker, 10)); + Schedulers.onScheduleHook("k3", new TrackingDecorator(tracker, 100)); + Schedulers.resetOnScheduleHook("k2"); + + CountDownLatch latch = new CountDownLatch(3); + Schedulers.newElastic("foo").schedule(latch::countDown); + latch.await(5, TimeUnit.SECONDS); + + assertThat(tracker).hasValue(101); + } + + @Test + public void onScheduleResetAll() throws InterruptedException { + AtomicInteger tracker = new AtomicInteger(); + Schedulers.onScheduleHook("k1", new TrackingDecorator(tracker, 1)); + Schedulers.onScheduleHook("k2", new TrackingDecorator(tracker, 10)); + Schedulers.onScheduleHook("k3", new TrackingDecorator(tracker, 100)); + Schedulers.resetOnScheduleHooks(); + + CountDownLatch latch = new CountDownLatch(1); + Schedulers.newElastic("foo").schedule(latch::countDown); + latch.await(5, TimeUnit.SECONDS); + + assertThat(tracker).hasValue(0); + } + + @Test + public void onSchedulesAreOrdered() throws Exception { + CopyOnWriteArrayList items = new CopyOnWriteArrayList<>(); + Schedulers.onScheduleHook("k1", new ApplicationOrderRecordingDecorator(items, "k1")); + Schedulers.onScheduleHook("k2", new ApplicationOrderRecordingDecorator(items, "k2")); + Schedulers.onScheduleHook("k3", new ApplicationOrderRecordingDecorator(items, "k3")); + + CountDownLatch latch = new CountDownLatch(1); + Schedulers.newElastic("foo").schedule(latch::countDown); + latch.await(5, TimeUnit.SECONDS); + + assertThat(items).containsExactly( + "k1#0", + "k2#0", + "k3#0" + ); + } + + private static class TrackingDecorator implements Function { + final AtomicInteger tracker; + final int dx; + + private TrackingDecorator(AtomicInteger tracker, int dx) { + this.tracker = tracker; + this.dx = dx; + } + + @Override + public Runnable apply(Runnable runnable) { + return () -> { + tracker.addAndGet(dx); + runnable.run(); + }; + } + } + + private static class ApplicationOrderRecordingDecorator + implements Function { + + final List items; + + final String id; + + final AtomicInteger counter = new AtomicInteger(); + + public ApplicationOrderRecordingDecorator(List items, String id) { + this.items = items; + this.id = id; + } + + @Override + public Runnable apply(Runnable runnable) { + items.add(id + "#" + counter.getAndIncrement()); + return runnable; + } + } +}