-
Notifications
You must be signed in to change notification settings - Fork 612
/
BatchingClusterEventManagerImpl.java
128 lines (110 loc) · 4.91 KB
/
BatchingClusterEventManagerImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package org.infinispan.notifications.cachelistener.cluster.impl;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import org.infinispan.Cache;
import org.infinispan.commons.CacheException;
import org.infinispan.distexec.DistributedExecutionCompletionService;
import org.infinispan.distexec.DistributedExecutorService;
import org.infinispan.factories.annotations.Start;
import org.infinispan.notifications.cachelistener.cluster.ClusterEvent;
import org.infinispan.notifications.cachelistener.cluster.ClusterEventCallable;
import org.infinispan.notifications.cachelistener.cluster.ClusterEventManager;
import org.infinispan.notifications.cachelistener.cluster.MultiClusterEventCallable;
import org.infinispan.remoting.transport.Address;
public class BatchingClusterEventManagerImpl<K, V> implements ClusterEventManager<K, V>{
private final Cache<K, V> cache;
private DistributedExecutorService distExecService;
private final ThreadLocal<EventContext<K, V>> localContext = new ThreadLocal<>();
public BatchingClusterEventManagerImpl(Cache<K, V> cache) {
this.cache = cache;
}
@Start
public void start() {
distExecService = SecurityActions.getDefaultExecutorService(cache);
}
@Override
public void addEvents(Address target, UUID identifier, Collection<ClusterEvent<K, V>> events, boolean sync) {
EventContext<K, V> ctx = localContext.get();
if (ctx == null) {
ctx = new UnicastEventContext<K, V>();
localContext.set(ctx);
}
ctx.addTargets(target, identifier, events, sync);
}
@Override
public void sendEvents() {
EventContext<K, V> ctx = localContext.get();
if (ctx != null) {
ctx.sendToTargets(distExecService);
localContext.remove();
}
}
@Override
public void dropEvents() {
localContext.remove();
}
private static interface EventContext<K, V> {
public void addTargets(Address address, UUID identifier, Collection<ClusterEvent<K, V>> events, boolean sync);
public void sendToTargets(DistributedExecutorService service);
}
protected static class UnicastEventContext<K, V> implements EventContext<K, V> {
protected final Map<Address, TargetEvents<K, V>> targets = new HashMap<>();
@Override
public void addTargets(Address address, UUID identifier, Collection<ClusterEvent<K, V>> events, boolean sync) {
TargetEvents<K, V> targetEvents = targets.get(address);
if (targetEvents == null) {
targetEvents = new TargetEvents<>();
targets.put(address, targetEvents);
}
Map<UUID, Collection<ClusterEvent<K, V>>> listenerEvents = targetEvents.events;
// This shouldn't be set before, so do put instead of doing get then put
Collection<ClusterEvent<K, V>> prevEvents = listenerEvents.put(identifier, events);
if (prevEvents != null) {
// If we have multiple events to the same node for the same uuid condense them. This shouldn't really happen...
events.addAll(prevEvents);
}
if (sync) {
targetEvents.sync = true;
}
}
@Override
public void sendToTargets(DistributedExecutorService service) {
DistributedExecutionCompletionService<Void> completion = new DistributedExecutionCompletionService<Void>(service);
int syncCount = 0;
for (Entry<Address, TargetEvents<K, V>> entry : targets.entrySet()) {
TargetEvents<K, V> value = entry.getValue();
if (value.events.size() > 1) {
if (value.sync) {
completion.submit(entry.getKey(), new MultiClusterEventCallable<>(value.events));
syncCount++;
} else {
service.submit(entry.getKey(), new MultiClusterEventCallable<>(value.events));
}
} else if (value.events.size() == 1) {
Entry<UUID, Collection<ClusterEvent<K, V>>> entryValue = value.events.entrySet().iterator().next();
if (value.sync) {
completion.submit(entry.getKey(), new ClusterEventCallable<K, V>(entryValue.getKey(), entryValue.getValue()));
syncCount++;
} else {
service.submit(entry.getKey(), new ClusterEventCallable<K, V>(entryValue.getKey(), entryValue.getValue()));
}
}
}
try {
for (int i = 0; i < syncCount; ++i) {
completion.take();
}
}
catch (InterruptedException e) {
throw new CacheException("Interrupted while waiting for event notifications to complete.", e);
}
}
}
private static class TargetEvents<K, V> {
final Map<UUID, Collection<ClusterEvent<K, V>>> events = new HashMap<>();
boolean sync = false;
}
}