Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump guava from 30.0-jre to 32.0.0-jre in /core #8

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@
<parent>
<groupId>com.cloudogu.legman</groupId>
<artifactId>legman</artifactId>
<version>2.0.1-SNAPSHOT</version>
<version>2.0.2-SNAPSHOT</version>
</parent>

<artifactId>core</artifactId>
<version>2.0.1-SNAPSHOT</version>
<version>2.0.2-SNAPSHOT</version>
<name>core</name>

<dependencies>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-jre</version>
<version>32.0.0-jre</version>
</dependency>

<dependency>
Expand Down
31 changes: 10 additions & 21 deletions core/src/main/java/com/github/legman/EventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public class EventBus {
* A thread-safe cache for flattenHierarchy(). The Class class is immutable. This cache is not shared between
* instances in order to avoid class loader leaks, in environments where classes will be load dynamically.
*/
@SuppressWarnings("UnstableApiUsage")
private final LoadingCache<Class<?>, Set<Class<?>>> flattenHierarchyCache =
CacheBuilder.newBuilder()
.weakKeys()
Expand Down Expand Up @@ -161,7 +160,7 @@ public Set<Class<?>> load(@Nonnull Class<?> concreteClass) {
private final String identifier;

/** executor for handling asynchronous events */
private final Executor executor;
private final ExecutorSerializer executor;

/** list of invocation interceptors **/
private final List<InvocationInterceptor> invocationInterceptors;
Expand Down Expand Up @@ -194,7 +193,7 @@ public EventBus(String identifier) {

private EventBus(Builder builder) {
this.identifier = builder.identifier;
this.executor = createExecutor(builder);
this.executor = new ExecutorSerializer(createExecutor(builder));
this.invocationInterceptors = Collections.unmodifiableList(builder.invocationInterceptors);
this.finder = new AnnotatedHandlerFinder();
}
Expand Down Expand Up @@ -384,8 +383,6 @@ void enqueueEvent(Object event, EventHandler handler) {
}
}



/**
* Dispatch {@code events} in the order they were posted, regardless of
* the posting thread.
Expand Down Expand Up @@ -441,26 +438,19 @@ void dispatch(final Object event, final EventHandler wrapper) {
}

if ( wrapper.isAsync() ){
executor.execute(() -> dispatchSynchronous(event, wrapper));
executor.dispatchAsynchronous(event, wrapper);
} else {
dispatchSynchronous(event, wrapper);
execute(event, wrapper);
}
}

void dispatchSynchronous(Object event, EventHandler wrapper){
private void execute(Object event, EventHandler wrapper) {
try {
wrapper.handleEvent(event);
} catch (InvocationTargetException e) {
if ( wrapper.isAsync() ){
StringBuilder msg = new StringBuilder(identifier);
msg.append(" - could not dispatch event: ").append(event);
msg.append(" to handler ").append(wrapper);
logger.error(msg.toString(), e);
} else {
Throwable cause = e.getCause();
Throwables.propagateIfPossible(cause);
throw new EventBusException(event, "could not dispatch event", cause);
}
Throwable cause = e.getCause();
Throwables.propagateIfPossible(cause);
throw new EventBusException(event, "could not dispatch event", cause);
}
}

Expand Down Expand Up @@ -488,9 +478,7 @@ Set<Class<?>> flattenHierarchy(Class<?> concreteClass) {
*/
public void shutdown() {
shutdown.set(true);
if (executor instanceof ExecutorService) {
((ExecutorService) executor).shutdown();
}
executor.shutdown();
}

/**
Expand Down Expand Up @@ -602,4 +590,5 @@ public EventBus build() {
return new EventBus(this);
}
}

}
4 changes: 4 additions & 0 deletions core/src/main/java/com/github/legman/EventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,8 @@ public String toString() {
.addValue(async)
.toString();
}

boolean hasToBeSynchronized() {
return false;
}
}
122 changes: 122 additions & 0 deletions core/src/main/java/com/github/legman/ExecutorSerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package com.github.legman;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

/**
* This class is used to guard the executor from being blocked by long-running
* event handlers. Instead of dispatching more events for such a event handler
* which might lead to a completely blocked event bus, this class dispatches only
* one event at a time for a specific event handler (despite if the handler is
* marked to be concurrent; in this case the events are dispatched as soon as they
* arrive). Further events are put into a queue, that is taken into account again
* whenever a process finishes.
*/
class ExecutorSerializer {

private static final Logger logger = LoggerFactory.getLogger(ExecutorSerializer.class);

/**
* The underlying java executor to handle the actual processing.
*/
private final Executor executor;

/**
* Set of handler, that are awaiting execution or currently are executing an event.
* Further events for handler in this collection are queued in {@link #queuedEvents}.
*/
private final Set<EventHandler> runningHandlers = new HashSet<>();
/**
* Queue of handlers and events that could not have been processed right away, because
* the handler already is 'busy' with another event.
*/
private final Queue<EventBus.EventWithHandler> queuedEvents = new LinkedList<>();

ExecutorSerializer(Executor executor) {
this.executor = executor;
}

/**
* This takes an event and a handler to dispatch it using the {@link #executor}. If the
* handler has to be synchronized (aka is marked as non-concurrent, {@link EventHandler#hasToBeSynchronized()}),
* this is done in the following process, otherwise it is 'put into' the {@link #executor}
* right away.
*/
void dispatchAsynchronous(final Object event, final EventHandler wrapper) {
if (wrapper.hasToBeSynchronized()) {
executeSynchronized(event, wrapper);
} else {
logger.debug("executing handler concurrently: {}", wrapper);
executor.execute(() -> dispatchDirectly(event, wrapper));
}
}

private void dispatchDirectly(Object event, EventHandler wrapper) {
try {
wrapper.handleEvent(event);
} catch (InvocationTargetException e) {
logger.error("could not dispatch event: {} to handler {}", event, wrapper, e);
}
}

private synchronized void executeSynchronized(final Object event, final EventHandler wrapper) {
if (runningHandlers.contains(wrapper)) {
logger.debug("postponing execution of handler {}; there are already {} other handlers waiting", wrapper, queuedEvents.size());
queuedEvents.add(new EventBus.EventWithHandler(event, wrapper));
} else {
runningHandlers.add(wrapper);
executor.execute(() -> {
try {
dispatchDirectly(event, wrapper);
} finally {
releaseRunningHandlerAndTriggerWaitingHandlers(wrapper);
}
});
}
}

private synchronized void releaseRunningHandlerAndTriggerWaitingHandlers(EventHandler wrapper) {
runningHandlers.remove(wrapper);
logger.debug("checking {} waiting handlers for possible execution", queuedEvents.size());
for (Iterator<EventBus.EventWithHandler> iterator = queuedEvents.iterator(); iterator.hasNext(); ) {
EventBus.EventWithHandler queuedHandler = iterator.next();
if (runningHandlers.contains(queuedHandler.handler)) {
logger.debug("execution of handler still waiting, because other call is still running: {}", wrapper);
} else {
logger.debug("executing postponed handler because it is no longer blocked: {}", wrapper);
iterator.remove();
executeSynchronized(queuedHandler.event, queuedHandler.handler);
break;
}
}
}

/**
* Triggers the shutdown of the {@link #executor} as soon as all events wating inside
* {@link #queuedEvents} are executed.
*/
void shutdown() {
executor.execute(() -> {
synchronized (this) {
if (queuedEvents.isEmpty()) {
logger.debug("no more handlers queued; shutting down executors");
if (executor instanceof ExecutorService) {
((ExecutorService) executor).shutdown();
}
} else {
logger.debug("queued handlers found; postponing shutdown");
shutdown();
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.github.legman;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

/**
Expand Down Expand Up @@ -46,10 +45,7 @@ public SynchronizedEventHandler(EventBus eventBus, Object target, Method method,
}

@Override
public void handleEvent(Object event) throws InvocationTargetException {
// https://code.google.com/p/guava-libraries/issues/detail?id=1403
synchronized (this) {
super.handleEvent(event);
}
boolean hasToBeSynchronized() {
return true;
}
}
Loading