diff --git a/core/pom.xml b/core/pom.xml index e5a0b93..ce2141a 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -6,11 +6,11 @@ com.cloudogu.legman legman - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT core - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT core @@ -18,7 +18,7 @@ com.google.guava guava - 30.0-jre + 32.0.0-jre diff --git a/core/src/main/java/com/github/legman/EventBus.java b/core/src/main/java/com/github/legman/EventBus.java index 05a3204..171aeed 100644 --- a/core/src/main/java/com/github/legman/EventBus.java +++ b/core/src/main/java/com/github/legman/EventBus.java @@ -117,7 +117,6 @@ public class EventBus { * A thread-safe cache for flattenHierarchy(). The Class class is immutable. This cache is not shared between * instances in order to avoid class loader leaks, in environments where classes will be load dynamically. */ - @SuppressWarnings("UnstableApiUsage") private final LoadingCache, Set>> flattenHierarchyCache = CacheBuilder.newBuilder() .weakKeys() @@ -161,7 +160,7 @@ public Set> load(@Nonnull Class concreteClass) { private final String identifier; /** executor for handling asynchronous events */ - private final Executor executor; + private final ExecutorSerializer executor; /** list of invocation interceptors **/ private final List invocationInterceptors; @@ -194,7 +193,7 @@ public EventBus(String identifier) { private EventBus(Builder builder) { this.identifier = builder.identifier; - this.executor = createExecutor(builder); + this.executor = new ExecutorSerializer(createExecutor(builder)); this.invocationInterceptors = Collections.unmodifiableList(builder.invocationInterceptors); this.finder = new AnnotatedHandlerFinder(); } @@ -384,8 +383,6 @@ void enqueueEvent(Object event, EventHandler handler) { } } - - /** * Dispatch {@code events} in the order they were posted, regardless of * the posting thread. @@ -441,26 +438,19 @@ void dispatch(final Object event, final EventHandler wrapper) { } if ( wrapper.isAsync() ){ - executor.execute(() -> dispatchSynchronous(event, wrapper)); + executor.dispatchAsynchronous(event, wrapper); } else { - dispatchSynchronous(event, wrapper); + execute(event, wrapper); } } - void dispatchSynchronous(Object event, EventHandler wrapper){ + private void execute(Object event, EventHandler wrapper) { try { wrapper.handleEvent(event); } catch (InvocationTargetException e) { - if ( wrapper.isAsync() ){ - StringBuilder msg = new StringBuilder(identifier); - msg.append(" - could not dispatch event: ").append(event); - msg.append(" to handler ").append(wrapper); - logger.error(msg.toString(), e); - } else { - Throwable cause = e.getCause(); - Throwables.propagateIfPossible(cause); - throw new EventBusException(event, "could not dispatch event", cause); - } + Throwable cause = e.getCause(); + Throwables.propagateIfPossible(cause); + throw new EventBusException(event, "could not dispatch event", cause); } } @@ -488,9 +478,7 @@ Set> flattenHierarchy(Class concreteClass) { */ public void shutdown() { shutdown.set(true); - if (executor instanceof ExecutorService) { - ((ExecutorService) executor).shutdown(); - } + executor.shutdown(); } /** @@ -602,4 +590,5 @@ public EventBus build() { return new EventBus(this); } } + } diff --git a/core/src/main/java/com/github/legman/EventHandler.java b/core/src/main/java/com/github/legman/EventHandler.java index 8c38ccb..20a188c 100644 --- a/core/src/main/java/com/github/legman/EventHandler.java +++ b/core/src/main/java/com/github/legman/EventHandler.java @@ -156,4 +156,8 @@ public String toString() { .addValue(async) .toString(); } + + boolean hasToBeSynchronized() { + return false; + } } diff --git a/core/src/main/java/com/github/legman/ExecutorSerializer.java b/core/src/main/java/com/github/legman/ExecutorSerializer.java new file mode 100644 index 0000000..a8c7dfe --- /dev/null +++ b/core/src/main/java/com/github/legman/ExecutorSerializer.java @@ -0,0 +1,122 @@ +package com.github.legman; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; + +/** + * This class is used to guard the executor from being blocked by long-running + * event handlers. Instead of dispatching more events for such a event handler + * which might lead to a completely blocked event bus, this class dispatches only + * one event at a time for a specific event handler (despite if the handler is + * marked to be concurrent; in this case the events are dispatched as soon as they + * arrive). Further events are put into a queue, that is taken into account again + * whenever a process finishes. + */ +class ExecutorSerializer { + + private static final Logger logger = LoggerFactory.getLogger(ExecutorSerializer.class); + + /** + * The underlying java executor to handle the actual processing. + */ + private final Executor executor; + + /** + * Set of handler, that are awaiting execution or currently are executing an event. + * Further events for handler in this collection are queued in {@link #queuedEvents}. + */ + private final Set runningHandlers = new HashSet<>(); + /** + * Queue of handlers and events that could not have been processed right away, because + * the handler already is 'busy' with another event. + */ + private final Queue queuedEvents = new LinkedList<>(); + + ExecutorSerializer(Executor executor) { + this.executor = executor; + } + + /** + * This takes an event and a handler to dispatch it using the {@link #executor}. If the + * handler has to be synchronized (aka is marked as non-concurrent, {@link EventHandler#hasToBeSynchronized()}), + * this is done in the following process, otherwise it is 'put into' the {@link #executor} + * right away. + */ + void dispatchAsynchronous(final Object event, final EventHandler wrapper) { + if (wrapper.hasToBeSynchronized()) { + executeSynchronized(event, wrapper); + } else { + logger.debug("executing handler concurrently: {}", wrapper); + executor.execute(() -> dispatchDirectly(event, wrapper)); + } + } + + private void dispatchDirectly(Object event, EventHandler wrapper) { + try { + wrapper.handleEvent(event); + } catch (InvocationTargetException e) { + logger.error("could not dispatch event: {} to handler {}", event, wrapper, e); + } + } + + private synchronized void executeSynchronized(final Object event, final EventHandler wrapper) { + if (runningHandlers.contains(wrapper)) { + logger.debug("postponing execution of handler {}; there are already {} other handlers waiting", wrapper, queuedEvents.size()); + queuedEvents.add(new EventBus.EventWithHandler(event, wrapper)); + } else { + runningHandlers.add(wrapper); + executor.execute(() -> { + try { + dispatchDirectly(event, wrapper); + } finally { + releaseRunningHandlerAndTriggerWaitingHandlers(wrapper); + } + }); + } + } + + private synchronized void releaseRunningHandlerAndTriggerWaitingHandlers(EventHandler wrapper) { + runningHandlers.remove(wrapper); + logger.debug("checking {} waiting handlers for possible execution", queuedEvents.size()); + for (Iterator iterator = queuedEvents.iterator(); iterator.hasNext(); ) { + EventBus.EventWithHandler queuedHandler = iterator.next(); + if (runningHandlers.contains(queuedHandler.handler)) { + logger.debug("execution of handler still waiting, because other call is still running: {}", wrapper); + } else { + logger.debug("executing postponed handler because it is no longer blocked: {}", wrapper); + iterator.remove(); + executeSynchronized(queuedHandler.event, queuedHandler.handler); + break; + } + } + } + + /** + * Triggers the shutdown of the {@link #executor} as soon as all events wating inside + * {@link #queuedEvents} are executed. + */ + void shutdown() { + executor.execute(() -> { + synchronized (this) { + if (queuedEvents.isEmpty()) { + logger.debug("no more handlers queued; shutting down executors"); + if (executor instanceof ExecutorService) { + ((ExecutorService) executor).shutdown(); + } + } else { + logger.debug("queued handlers found; postponing shutdown"); + shutdown(); + } + } + }); + } +} diff --git a/core/src/main/java/com/github/legman/SynchronizedEventHandler.java b/core/src/main/java/com/github/legman/SynchronizedEventHandler.java index 2f03d69..aac0eeb 100644 --- a/core/src/main/java/com/github/legman/SynchronizedEventHandler.java +++ b/core/src/main/java/com/github/legman/SynchronizedEventHandler.java @@ -16,7 +16,6 @@ package com.github.legman; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; /** @@ -46,10 +45,7 @@ public SynchronizedEventHandler(EventBus eventBus, Object target, Method method, } @Override - public void handleEvent(Object event) throws InvocationTargetException { - // https://code.google.com/p/guava-libraries/issues/detail?id=1403 - synchronized (this) { - super.handleEvent(event); - } + boolean hasToBeSynchronized() { + return true; } } diff --git a/core/src/test/java/com/github/legman/EventBusTest.java b/core/src/test/java/com/github/legman/EventBusTest.java index cd341df..fd3c4db 100644 --- a/core/src/test/java/com/github/legman/EventBusTest.java +++ b/core/src/test/java/com/github/legman/EventBusTest.java @@ -16,12 +16,25 @@ package com.github.legman; +import com.google.common.base.Stopwatch; import org.assertj.guava.api.Assertions; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.io.IOException; - +import java.time.temporal.ChronoUnit; +import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.Collections.synchronizedCollection; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -155,9 +168,99 @@ void testShutdown() throws InterruptedException { assertThat(listener.event).isNull(); } - /** - * Listener classes - */ + @Test + void testConcurrentExecution() { + EventBus bus = new EventBus(); + ConcurrentListener concurrentListener = new ConcurrentListener(); + bus.register(concurrentListener); + bus.post("event"); + bus.post("event"); + bus.post("event"); + assertThat(concurrentListener.concurrentAccessDetected).isTrue(); + } + + @Test + void testNonConcurrentExecution() { + EventBus bus = new EventBus(); + NonConcurrentListener concurrentListener = new NonConcurrentListener(); + bus.register(concurrentListener); + bus.post("event"); + bus.post("event"); + bus.post("event"); + assertThat(concurrentListener.concurrentAccessDetected).isFalse(); + } + + @Test + void shouldNotBeBlockedByLongRunningEventHandlers() throws InterruptedException { + EventBus bus = new EventBus(); + LongRunningListener longRunningListener = new LongRunningListener(); + bus.register(longRunningListener); + QuickListener quickListener = new QuickListener(); + bus.register(quickListener); + + IntStream.range(0, 5).forEach(i -> bus.post("event-" + i)); + + quickListener.awaitCountDown(); + assertThat(longRunningListener.currentCount()).isZero(); + longRunningListener.finish(); + } + + @Test + void shouldNotSkipEventsAfterShutdown() { + EventBus bus = new EventBus(); + LongRunningListener longRunningListener = new LongRunningListener(); + bus.register(longRunningListener); + + IntStream.range(0, 5).forEach(i -> bus.post("event-" + i)); + + bus.shutdown(); + + longRunningListener.finish(); + + await().atMost(1, SECONDS).untilAsserted(() -> assertThat(longRunningListener.currentCount()).isEqualTo(5)); + } + + @Test + @Disabled("for manual testing only") + void shouldHandleMassiveEvents() { + EventBus bus = new EventBus(); + Collection listener = IntStream.range(1, 7) + .mapToObj(RandomNonConcurrentListener::new) + .collect(Collectors.toList()); + listener.forEach(bus::register); + + AtomicLong maxPostTime = new AtomicLong(0); + + new Thread(() -> + { + IntStream.range(0, 1000) + .peek(i -> { + try { + Thread.sleep((long) (200 * Math.random())); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }) + .forEach(event -> { + System.out.printf("posting event %s%n", event); + Stopwatch stopwatch = Stopwatch.createStarted(); + bus.post(event); + long postTimeInMs = stopwatch.elapsed().get(ChronoUnit.NANOS) / 1000000; + System.out.printf("posting event %s took %sms%n", event, postTimeInMs); + maxPostTime.getAndAccumulate(postTimeInMs, Math::max); + }); + bus.shutdown(); + }).start(); + + await().atMost(1000, SECONDS) + .untilAsserted(() -> { + listener.forEach( + l -> assertThat(l.handledEventCount()).as("checking executed tasks of event handler #%s", l.nr).isEqualTo(1000) + ); + }); + + assertThat(maxPostTime.get()).isLessThan(100); + } private static class ThreadNameTestListener { @@ -239,4 +342,97 @@ public void handleEvent(String event) { } } + private static class ConcurrentListener { + + private final AtomicInteger currentAccessCount = new AtomicInteger(0); + private boolean concurrentAccessDetected = false; + + @Subscribe(allowConcurrentAccess = true) + public void handleEvent(String event) throws InterruptedException { + if (currentAccessCount.getAndIncrement() > 0) { + concurrentAccessDetected = true; + } + Thread.sleep(1000); + currentAccessCount.decrementAndGet(); + } + } + + private static class NonConcurrentListener { + + private final AtomicInteger currentAccessCount = new AtomicInteger(0); + private boolean concurrentAccessDetected = false; + + @Subscribe(allowConcurrentAccess = false) + public void handleEvent(String event) throws InterruptedException { + if (currentAccessCount.getAndIncrement() > 0) { + concurrentAccessDetected = true; + } + Thread.sleep(1000); + currentAccessCount.decrementAndGet(); + } + } + + private static class LongRunningListener { + + private final AtomicInteger readyCount = new AtomicInteger(0); + private final CountDownLatch latch = new CountDownLatch(1); + + @Subscribe + public void handleEvent(String event) { + try { + latch.await(); + readyCount.incrementAndGet(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + int currentCount() { + return readyCount.get(); + } + + public void finish() { + latch.countDown(); + } + } + + private static class QuickListener { + + private final CountDownLatch countDownLatch = new CountDownLatch(5); + + @Subscribe + public void handleEvent(String event) { + System.out.println(countDownLatch.getCount()); + countDownLatch.countDown(); + } + + void awaitCountDown() throws InterruptedException { + countDownLatch.await(); + } + } + + private static class RandomNonConcurrentListener { + + private final Collection handledEvents = synchronizedCollection(new HashSet<>(1000)); + private final int nr; + + public RandomNonConcurrentListener(int nr) { + this.nr = nr; + } + + @Subscribe + public void handleEvent(Integer event) { + try { + System.out.printf("Running %s - %s%n", nr, event); + Thread.sleep((long)(Math.random() * 1000 / Math.max(Math.random() * nr, 1))); + handledEvents.add(event); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private int handledEventCount() { + return handledEvents.size(); + } + } } diff --git a/legman-maven-plugin/pom.xml b/legman-maven-plugin/pom.xml index 9961571..762e600 100644 --- a/legman-maven-plugin/pom.xml +++ b/legman-maven-plugin/pom.xml @@ -6,11 +6,11 @@ com.cloudogu.legman legman - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT legman-maven-plugin - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT legman-maven-plugin maven-plugin @@ -31,7 +31,7 @@ com.cloudogu.legman core - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT diff --git a/pom.xml b/pom.xml index 4a8f3c8..2522dd8 100644 --- a/pom.xml +++ b/pom.xml @@ -6,12 +6,12 @@ com.cloudogu.legman legman pom - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT This project is a fork of the EventBus of Google Guava legman - https://github.com/sdorra/legman + https://github.com/scm-manager/legman @@ -22,30 +22,25 @@ - sdorra - Sebastian Sdorra - s.sdorra@gmail.com + cloudogu + Cloudogu GmbH + info@cloudogu.com Europe/Berlin - scm:git:https://github.com/sdorra/legman.git - scm:git:https://github.com/sdorra/legman.git - https://github.com/sdorra/legman + scm:git:https://github.com/scm-manager/legman.git + scm:git:https://github.com/scm-manager/legman.git + https://github.com/scm-manager/legman HEAD github - https://github.com/sdorra/legman/issues + https://github.com/scm-manager/legman/issues - - Jenkins - https://scm-manager.ci.cloudbees.com/ - - 3.0.0 diff --git a/support/guice/pom.xml b/support/guice/pom.xml index 0c79497..f7a126c 100644 --- a/support/guice/pom.xml +++ b/support/guice/pom.xml @@ -6,11 +6,11 @@ com.cloudogu.legman.support support - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT guice - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT guice @@ -18,7 +18,7 @@ com.cloudogu.legman core - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT diff --git a/support/micrometer/pom.xml b/support/micrometer/pom.xml index a2e3ec2..e6b3eea 100644 --- a/support/micrometer/pom.xml +++ b/support/micrometer/pom.xml @@ -6,11 +6,11 @@ com.cloudogu.legman.support support - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT micrometer - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT micrometer @@ -18,7 +18,7 @@ com.cloudogu.legman core - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT diff --git a/support/pom.xml b/support/pom.xml index 809565e..8c11c63 100644 --- a/support/pom.xml +++ b/support/pom.xml @@ -6,12 +6,12 @@ com.cloudogu.legman legman - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT com.cloudogu.legman.support support - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT pom support diff --git a/support/shiro/pom.xml b/support/shiro/pom.xml index 19ab790..d5ec46d 100644 --- a/support/shiro/pom.xml +++ b/support/shiro/pom.xml @@ -6,11 +6,11 @@ com.cloudogu.legman.support support - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT shiro - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT shiro @@ -18,7 +18,7 @@ com.cloudogu.legman core - 2.0.1-SNAPSHOT + 2.0.2-SNAPSHOT