Permalink
Browse files

[breaking] EventCoalescor is now abstract and generified

The default implementation, the old EventCoalescor, is implemented as
a private static inner class of the EventCoalescor, it's an implementation
called "generic". It might be built, using the factory method:
"EventCoalescor.generic"

So FileWatchEventCoalescor inherit from EventCoalescor for the type FileWatchEvent.
  • Loading branch information...
a-peyrard committed Feb 18, 2015
1 parent 59454c0 commit bd95736a60d5fe7b92f8718d27b6e5f43040d317
@@ -49,7 +49,7 @@ public boolean isEnabled() {
private final WatchService watcher;
private final Map<WatchKey, Path> keys;
private final boolean recursive;
private final EventCoalescor coalescor;
private final EventCoalescor<Object> coalescor;
private final Path root;
private boolean trace = false;
@@ -98,7 +98,7 @@ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
this.keys = new HashMap<>();
this.recursive = settings.recurse();
this.root = dir;
this.coalescor = new EventCoalescor(eventBus, settings.coalescePeriod());
this.coalescor = EventCoalescor.generic(eventBus, settings.coalescePeriod());
if (recursive) {
registerAll(dir);
@@ -22,39 +22,63 @@
* Other similar events occuring within the period are simply not discarded.
* </p>
*/
public class EventCoalescor implements Closeable {
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private final EventBus eventBus;
private final long coalescePeriod;
public abstract class EventCoalescor<T> implements Closeable {
private final Set<Object> queue = new LinkedHashSet<>();
/**
* Create an instance of an {@link EventCoalescor} which accept all kind of events,
* as it is untyped.
*
* @param eventBus the event bus where to post processed events
* @param coalescePeriod the coalesce period
* @return the generic event coalescor
*/
public static EventCoalescor<Object> generic(EventBus eventBus, long coalescePeriod) {
return new GenericEventCoalescor(eventBus, coalescePeriod);
}
public EventCoalescor(EventBus eventBus, long coalescePeriod) {
final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
final EventBus eventBus;
final long coalescePeriod;
EventCoalescor(EventBus eventBus, long coalescePeriod) {
this.eventBus = eventBus;
this.coalescePeriod = coalescePeriod;
}
public void post(final Object event) {
synchronized (queue) {
if (queue.add(event)) {
executor.schedule(new Runnable() {
@Override
public void run() {
try {
eventBus.post(event);
} finally {
synchronized (queue) {
queue.remove(event);
}
}
}
}, coalescePeriod, TimeUnit.MILLISECONDS);
}
}
}
public abstract void post(final T event);
@Override
public void close() throws IOException {
executor.shutdownNow();
}
/**
* generic coalescor, using untyped events
*/
private static class GenericEventCoalescor extends EventCoalescor<Object> {
private final Set<Object> queue = new LinkedHashSet<>();
private GenericEventCoalescor(EventBus eventBus, long coalescePeriod) {
super(eventBus, coalescePeriod);
}
public void post(final Object event) {
synchronized (queue) {
if (queue.add(event)) {
executor.schedule(new Runnable() {
@Override
public void run() {
try {
eventBus.post(event);
} finally {
synchronized (queue) {
queue.remove(event);
}
}
}
}, coalescePeriod, TimeUnit.MILLISECONDS);
}
}
}
}
}
@@ -2,15 +2,11 @@
import com.google.common.eventbus.EventBus;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
@@ -25,17 +21,12 @@
*
* @author apeyrard
*/
public class FileWatchEventCoalescor implements Closeable {
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private final EventBus eventBus;
private final long coalescePeriod;
public class FileWatchEventCoalescor extends EventCoalescor<FileWatchEvent> {
private final HashMap<FileWatchEventKey, Deque<EventReference>> queue = new HashMap<>();
public FileWatchEventCoalescor(EventBus eventBus, long coalescePeriod) {
this.eventBus = eventBus;
this.coalescePeriod = coalescePeriod;
super(eventBus, coalescePeriod);
}
/**
@@ -157,11 +148,6 @@ void clear() {
}
}
@Override
public void close() throws IOException {
executor.shutdownNow();
}
/**
* key used for the storage of an event, composed by file paths, two event with same keys, are for the same physical file
*/
@@ -44,7 +44,7 @@ public boolean isEnabled() {
private final WatchService watcher;
private final Map<WatchKey,Path> keys;
private final boolean recursive;
private final EventCoalescor coalescor;
private final EventCoalescor<Object> coalescor;
private final Path root;
private boolean trace = false;
@@ -96,7 +96,7 @@ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
this.keys = new HashMap<>();
this.recursive = settings.recurse();
this.root = dir;
this.coalescor = new EventCoalescor(eventBus, settings.coalescePeriod());
this.coalescor = EventCoalescor.generic(eventBus, settings.coalescePeriod());
if (recursive) {
registerAll(dir);
@@ -1,14 +1,15 @@
package restx.common.watch;
import static org.assertj.core.api.Assertions.assertThat;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
/**
* User: xavierhanin
* Date: 9/11/13
@@ -27,7 +28,7 @@ public void onMessage(String msg) {
}
});
EventCoalescor coalescor = new EventCoalescor(eventBus, 30);
EventCoalescor coalescor = EventCoalescor.generic(eventBus, 30);
coalescor.post("test1");
coalescor.post("test2");

0 comments on commit bd95736

Please sign in to comment.