-
Notifications
You must be signed in to change notification settings - Fork 0
/
AbstractBroadcaster.java
66 lines (56 loc) · 1.97 KB
/
AbstractBroadcaster.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/*
* Copyright 2024 Mart Somermaa
* SPDX-License-Identifier: Apache-2.0
*/
package org.test;
import com.vaadin.flow.shared.Registration;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
import javax.enterprise.concurrent.ManagedExecutorService;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
@Slf4j
abstract class AbstractBroadcaster<M> {
private final ConcurrentLinkedQueue<Consumer<M>> listeners = new ConcurrentLinkedQueue<>();
@Resource
private ManagedExecutorService executorService;
@Inject
private Event<M> messageEvent;
public Registration register(Consumer<M> listener) {
log.info("{} registering {}", this, listener);
if (!listeners.contains(listener)) {
listeners.add(listener);
return () -> unregister(listener);
} else {
throw new IllegalArgumentException("Listener " + listener + " is already registered");
}
}
public void broadcast(M message) {
messageEvent.fire(message);
}
public void broadcastAsync(M message) {
messageEvent.fireAsync(message);
}
protected void onMessageImpl(M message) {
log.info("{} got message {}, listeners are: {}", this, message, listeners);
for (final Consumer<M> listener : listeners) {
executorService.execute(() -> {
try {
listener.accept(message);
} catch (Exception e) {
log.error("Error processing message in listener: {}", listener, e);
}
});
}
}
private void unregister(Consumer<M> listener) {
log.info("{} unregistering {}", this, listener);
if (listeners.contains(listener)) {
listeners.remove(listener);
} else {
log.warn("Listener {} is not registered, cannot unregister", listener);
}
}
}