/
ClientListenerNotifier.java
356 lines (312 loc) · 14.5 KB
/
ClientListenerNotifier.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
package org.infinispan.client.hotrod.event;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
import org.infinispan.client.hotrod.annotation.ClientCacheFailover;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.operations.AddClientListenerOperation;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.transport.Transport;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.equivalence.AnyEquivalence;
import org.infinispan.commons.equivalence.ByteArrayEquivalence;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.commons.util.Util;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* @author Galder Zamarreño
*/
public class ClientListenerNotifier {
private static final Log log = LogFactory.getLog(ClientListenerNotifier.class, Log.class);
private static final boolean trace = log.isTraceEnabled();
private static final Map<Class<? extends Annotation>, Class<?>[]> allowedListeners = new HashMap<>(4);
static {
allowedListeners.put(ClientCacheEntryCreated.class, new Class[]{ClientCacheEntryCreatedEvent.class, ClientCacheEntryCustomEvent.class});
allowedListeners.put(ClientCacheEntryModified.class, new Class[]{ClientCacheEntryModifiedEvent.class, ClientCacheEntryCustomEvent.class});
allowedListeners.put(ClientCacheEntryRemoved.class, new Class[]{ClientCacheEntryRemovedEvent.class, ClientCacheEntryCustomEvent.class});
allowedListeners.put(ClientCacheEntryExpired.class, new Class[]{ClientCacheEntryExpiredEvent.class, ClientCacheEntryCustomEvent.class});
allowedListeners.put(ClientCacheFailover.class, new Class[]{ClientCacheFailoverEvent.class});
}
private final ConcurrentMap<byte[], EventDispatcher> clientListeners = CollectionFactory.makeConcurrentMap(
ByteArrayEquivalence.INSTANCE, AnyEquivalence.getInstance());
private final ExecutorService executor;
private final Codec codec;
private final Marshaller marshaller;
protected ClientListenerNotifier(ExecutorService executor, Codec codec, Marshaller marshaller) {
this.executor = executor;
this.codec = codec;
this.marshaller = marshaller;
}
public static ClientListenerNotifier create(Codec codec, Marshaller marshaller) {
ExecutorService executor = Executors.newCachedThreadPool(getRestoreThreadNameThreadFactory());
return new ClientListenerNotifier(executor, codec, marshaller);
}
private static ThreadFactory getRestoreThreadNameThreadFactory() {
return r -> new Thread(() -> {
final String originalName = Thread.currentThread().getName();
try {
r.run();
} finally {
Thread.currentThread().setName(originalName);
}
});
}
public Marshaller getMarshaller() {
return marshaller;
}
public void addClientListener(AddClientListenerOperation op) {
Map<Class<? extends Annotation>, List<ClientListenerInvocation>> invocables = findMethods(op.listener);
EventDispatcher eventDispatcher = new EventDispatcher(op, invocables, op.getCacheName());
clientListeners.put(op.listenerId, eventDispatcher);
if (trace)
log.tracef("Add client listener with id %s, for listener %s and invocable methods %s",
Util.printArray(op.listenerId), op.listener, invocables);
}
public void failoverClientListeners(Set<SocketAddress> failedServers) {
// Compile all listener ids that need failing over
List<byte[]> failoverListenerIds = new ArrayList<>();
for (Map.Entry<byte[], EventDispatcher> entry : clientListeners.entrySet()) {
EventDispatcher dispatcher = entry.getValue();
if (failedServers.contains(dispatcher.transport.getRemoteSocketAddress()))
failoverListenerIds.add(entry.getKey());
}
if (trace && failoverListenerIds.isEmpty())
log.tracef("No event listeners registered in faild servers: %s", failedServers);
// Remove tracking listeners and read to the fallback transport
for (byte[] listenerId : failoverListenerIds) {
EventDispatcher dispatcher = clientListeners.get(listenerId);
removeClientListener(listenerId);
// Invoke failover event callback, if presents
invokeFailoverEvent(dispatcher);
// Re-execute adding client listener in one of the remaining nodes
dispatcher.op.execute();
if (trace) {
SocketAddress failedServerAddress = dispatcher.transport.getRemoteSocketAddress();
log.tracef("Fallback listener id %s from a failed server %s to %s",
Util.printArray(listenerId), failedServerAddress,
dispatcher.op.getDedicatedTransport().getRemoteSocketAddress());
}
}
}
private void invokeFailoverEvent(EventDispatcher dispatcher) {
List<ClientListenerInvocation> callbacks = dispatcher.invocables.get(ClientCacheFailover.class);
if (callbacks != null) {
for (ClientListenerInvocation callback : callbacks)
callback.invoke(ClientEvents.mkCachefailoverEvent());
}
}
public void startClientListener(byte[] listenerId) {
EventDispatcher eventDispatcher = clientListeners.get(listenerId);
executor.submit(eventDispatcher);
}
public void removeClientListener(byte[] listenerId) {
EventDispatcher dispatcher = clientListeners.remove(listenerId);
dispatcher.transport.release(); // force shutting it
if (trace)
log.tracef("Remove client listener with id %s", Util.printArray(listenerId));
}
public byte[] findListenerId(Object listener) {
for (EventDispatcher dispatcher : clientListeners.values()) {
if (dispatcher.op.listener.equals(listener))
return dispatcher.op.listenerId;
}
return null;
}
public boolean isListenerConnected(byte[] listenerId) {
EventDispatcher dispatcher = clientListeners.get(listenerId);
// If listener not present, is not active
return dispatcher != null && !dispatcher.stopped;
}
public Transport findTransport(byte[] listenerId) {
EventDispatcher dispatcher = clientListeners.get(listenerId);
if (dispatcher != null)
return dispatcher.transport;
return null;
}
private Map<Class<? extends Annotation>, List<ClientListenerInvocation>> findMethods(Object listener) {
Map<Class<? extends Annotation>, List<ClientListenerInvocation>> listenerMethodMap = new HashMap<>(4, 0.99f);
for (Method m : listener.getClass().getMethods()) {
// loop through all valid method annotations
for (Map.Entry<Class<? extends Annotation>, Class<?>[]> entry : allowedListeners.entrySet()) {
Class<? extends Annotation> annotationType = entry.getKey();
Class<?>[] eventTypes = entry.getValue();
if (m.isAnnotationPresent(annotationType)) {
testListenerMethodValidity(m, eventTypes, annotationType.getName());
SecurityActions.setAccessible(m);
ClientListenerInvocation invocation = new ClientListenerInvocation(listener, m);
List<ClientListenerInvocation> invocables = listenerMethodMap.get(annotationType);
if (invocables == null) {
invocables = new ArrayList<>();
listenerMethodMap.put(annotationType, invocables);
}
invocables.add(invocation);
}
}
}
return listenerMethodMap;
}
private void testListenerMethodValidity(Method m, Class<?>[] allowedParameters, String annotationName) {
boolean isAllowed = false;
for (Class<?> allowedParameter : allowedParameters) {
if (m.getParameterTypes().length == 1 && m.getParameterTypes()[0].isAssignableFrom(allowedParameter)) {
isAllowed = true;
break;
}
}
if (!isAllowed)
throw log.incorrectClientListener(annotationName, Arrays.asList(allowedParameters));
if (!m.getReturnType().equals(void.class))
throw log.incorrectClientListener(annotationName);
}
public Set<Object> getListeners(byte[] cacheName) {
Set<Object> ret = new HashSet<>(clientListeners.size());
for (EventDispatcher dispatcher : clientListeners.values()) {
if (Arrays.equals(dispatcher.op.cacheName, cacheName))
ret.add(dispatcher.op.listener);
}
return ret;
}
public void stop() {
for (byte[] listenerId : clientListeners.keySet()) {
if (trace)
log.tracef("Remote cache manager stopping, remove client listener id %s", Util.printArray(listenerId));
removeClientListener(listenerId);
}
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void invokeEvent(byte[] listenerId, ClientEvent clientEvent) {
EventDispatcher eventDispatcher = clientListeners.get(listenerId);
eventDispatcher.invokeClientEvent(clientEvent);
}
private final class EventDispatcher implements Runnable {
final Map<Class<? extends Annotation>, List<ClientListenerInvocation>> invocables;
final AddClientListenerOperation op;
final Transport transport;
final String cacheName;
volatile boolean stopped = false;
private EventDispatcher(AddClientListenerOperation op,
Map<Class<? extends Annotation>, List<ClientListenerInvocation>> invocables,
String cacheName) {
this.op = op;
this.transport = op.getDedicatedTransport();
this.invocables = invocables;
this.cacheName = cacheName;
}
@Override
public void run() {
Thread.currentThread().setName(getThreadName());
while (!Thread.currentThread().isInterrupted()) {
ClientEvent clientEvent = null;
try {
clientEvent = codec.readEvent(transport, op.listenerId, marshaller);
invokeClientEvent(clientEvent);
// Nullify event, makes it easier to identify network vs invocation error messages
clientEvent = null;
} catch (TransportException e) {
Throwable cause = e.getCause();
if (cause instanceof ClosedChannelException) {
// Channel closed, ignore and exit
log.debug("Channel closed, exiting event reader thread");
stopped = true;
return;
} else if (cause instanceof SocketTimeoutException) {
log.debug("Timed out reading event, retry");
} else if (clientEvent != null) {
log.unexpectedErrorConsumingEvent(clientEvent, e);
} else {
log.unrecoverableErrorReadingEvent(e, transport.getRemoteSocketAddress());
stopped = true;
return; // Server is likely gone!
}
} catch (CancelledKeyException e) {
// Cancelled key exceptions are also thrown when the channel has been closed
log.debug("Key cancelled, most likely channel closed, exiting event reader thread");
stopped = true;
return;
} catch (Throwable t) {
if (clientEvent != null)
log.unexpectedErrorConsumingEvent(clientEvent, t);
else
log.unableToReadEventFromServer(t, transport.getRemoteSocketAddress());
if (!transport.isValid()) {
stopped = true;
return;
}
}
}
}
String getThreadName() {
String listenerId = Util.toHexString(op.listenerId, 8);
return cacheName.isEmpty()
? "Client-Listener-" + listenerId
: "Client-Listener-" + cacheName + "-" + listenerId;
}
void invokeClientEvent(ClientEvent clientEvent) {
if (trace)
log.tracef("Event %s received for listener with id=%s", clientEvent, Util.printArray(op.listenerId));
switch (clientEvent.getType()) {
case CLIENT_CACHE_ENTRY_CREATED:
invokeCallbacks(clientEvent, ClientCacheEntryCreated.class);
break;
case CLIENT_CACHE_ENTRY_MODIFIED:
invokeCallbacks(clientEvent, ClientCacheEntryModified.class);
break;
case CLIENT_CACHE_ENTRY_REMOVED:
invokeCallbacks(clientEvent, ClientCacheEntryRemoved.class);
break;
case CLIENT_CACHE_ENTRY_EXPIRED:
invokeCallbacks(clientEvent, ClientCacheEntryExpired.class);
break;
}
}
private void invokeCallbacks(ClientEvent event, Class<? extends Annotation> type) {
List<ClientListenerInvocation> callbacks = invocables.get(type);
if (callbacks != null) {
for (ClientListenerInvocation callback : callbacks)
callback.invoke(event);
}
}
}
private static final class ClientListenerInvocation {
private static final Log log = LogFactory.getLog(ClientListenerInvocation.class, Log.class);
final Object listener;
final Method method;
private ClientListenerInvocation(Object listener, Method method) {
this.listener = listener;
this.method = method;
}
public void invoke(ClientEvent event) {
try {
method.invoke(listener, event);
} catch (Exception e) {
throw log.exceptionInvokingListener(
e.getClass().getName(), method, listener, e);
}
}
}
}