Skip to content

Commit

Permalink
ISPN-9589 Client Listener notifications can deadlock
Browse files Browse the repository at this point in the history
* Sync listeners can block as well - fire on executor
  • Loading branch information
wburns committed Oct 5, 2018
1 parent e6a3142 commit 1b26b5a
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 6 deletions.
Expand Up @@ -5,12 +5,14 @@
import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
import org.infinispan.client.hotrod.annotation.ClientCacheFailover;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.configuration.NearCacheConfiguration;
import org.infinispan.client.hotrod.event.ClientCacheEntryCustomEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryExpiredEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
import org.infinispan.client.hotrod.event.ClientCacheFailoverEvent;
Expand Down Expand Up @@ -170,6 +172,12 @@ public void handleRemovedEvent(ClientCacheEntryRemovedEvent<K> event) {
invalidate(event.getKey());
}

@ClientCacheEntryExpired
@SuppressWarnings("unused")
public void handleExpiredEvent(ClientCacheEntryExpiredEvent<K> event) {
invalidate(event.getKey());
}

@ClientCacheFailover
@SuppressWarnings("unused")
public void handleFailover(ClientCacheFailoverEvent e) {
Expand Down Expand Up @@ -228,6 +236,7 @@ private <T> T unmarshallObject(byte[] bytes, String element) {
}

@ClientCacheEntryRemoved
@ClientCacheEntryExpired
@SuppressWarnings("unused")
public void handleRemovedEvent(ClientCacheEntryCustomEvent<byte[]> e) {
ByteBuffer in = ByteBuffer.wrap(e.getEventData());
Expand Down
Expand Up @@ -51,7 +51,7 @@ private boolean isBlockingRead(CacheInfo info, HotRodHeader header) {
private boolean isBlockingWrite(AdvancedCache<byte[], byte[]> cache, HotRodHeader header) {
CacheInfo info = server.getCacheInfo(cache, header);
// Note: cache store cannot be skipped (yet)
return info.persistence || info.indexing && !header.isSkipIndexing();
return info.persistence || info.indexing && !header.isSkipIndexing() || info.syncListener;
}

void ping(HotRodHeader header, Subject subject) {
Expand Down
Expand Up @@ -10,6 +10,7 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
Expand All @@ -19,7 +20,6 @@
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -67,6 +67,13 @@
import org.infinispan.metadata.Metadata;
import org.infinispan.multimap.impl.EmbeddedMultimapCache;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.CacheEntryListenerInvocation;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.notifications.cachelistener.CacheNotifierImpl;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.annotation.TopologyChanged;
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
Expand All @@ -93,6 +100,8 @@
import org.infinispan.upgrade.RollingUpgradeManager;
import org.infinispan.util.KeyValuePair;

import com.github.benmanes.caffeine.cache.Caffeine;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandler;
Expand All @@ -117,13 +126,21 @@ public HotRodServer() {
super("HotRod");
}

private static Map<String, CacheInfo> expirationMap(long time, TimeUnit unit) {
com.github.benmanes.caffeine.cache.Cache<String, CacheInfo> cache = Caffeine.newBuilder()
.expireAfterWrite(time, unit).build();
return cache.asMap();
}

private Address clusterAddress;
private ServerAddress address;
private Cache<Address, ServerAddress> addressCache;
private Map<String, AdvancedCache> knownCaches = CollectionFactory.makeConcurrentMap(4, 0.9f, 16);
private Map<String, Configuration> knownCacheConfigurations = CollectionFactory.makeConcurrentMap(4, 0.9f, 16);
private Map<String, ComponentRegistry> knownCacheRegistries = CollectionFactory.makeConcurrentMap(4, 0.9f, 16);
private final Map<String, CacheInfo> cacheInfo = new ConcurrentHashMap<>();
// This needs to be an expiration map to allow for knowledge of dynamic cache properties such as
// sync listeners which can change over time
private final Map<String, CacheInfo> cacheInfo = expirationMap(5, TimeUnit.SECONDS);
private QueryFacade queryFacade;
private Map<String, SaslServerFactory> saslMechFactories = CollectionFactory.makeConcurrentMap(4, 0.9f, 16);
private ClientListenerRegistry clientListenerRegistry;
Expand Down Expand Up @@ -424,6 +441,18 @@ public void cacheStopped(String cacheName) {
cacheInfo.keySet().stream().filter(k -> k.startsWith(cacheName)).forEach(cacheInfo::remove);
}

boolean hasSyncListener(CacheNotifierImpl<?, ?> cacheNotifier) {
for (Class<? extends Annotation> annotation :
new Class[]{CacheEntryCreated.class, CacheEntryRemoved.class, CacheEntryExpired.class, CacheEntryModified.class}) {
for (CacheEntryListenerInvocation invocation : cacheNotifier.getListenerCollectionForAnnotation(annotation)) {
if (invocation.isSync()) {
return true;
}
}
}
return false;
}

public CacheInfo getCacheInfo(AdvancedCache<byte[], byte[]> cache, HotRodHeader header) {
// Fetching persistence manager would require security action, and would be too expensive
CacheInfo info = cacheInfo.get(cache.getName() + header.getKeyMediaType().getTypeSubtype() + header.getValueMediaType().getTypeSubtype());
Expand All @@ -432,12 +461,14 @@ public CacheInfo getCacheInfo(AdvancedCache<byte[], byte[]> cache, HotRodHeader
.noFlags().withFlags(LOCAL_NON_BLOCKING_GET);
if (cache.getStatus() != ComponentStatus.RUNNING) {
// stay on the safe side
return new CacheInfo(localNonBlocking, true, true);
return new CacheInfo(localNonBlocking, true, true, true);
}
ComponentRegistry cr = SecurityActions.getCacheComponentRegistry(cache);

PersistenceManager pm = cr.getComponent(PersistenceManager.class);
boolean hasIndexing = SecurityActions.getCacheConfiguration(cache).indexing().index().isEnabled();
info = new CacheInfo(localNonBlocking, pm.isEnabled(), hasIndexing);
CacheNotifierImpl cacheNotifier = (CacheNotifierImpl) cr.getComponent(CacheNotifier.class);
info = new CacheInfo(localNonBlocking, pm.isEnabled(), hasIndexing, hasSyncListener(cacheNotifier));
cacheInfo.put(cache.getName() + header.getKeyMediaType().getTypeSubtype() + header.getValueMediaType().getTypeSubtype(), info);
}
return info;
Expand Down Expand Up @@ -668,11 +699,14 @@ public static class CacheInfo {
final AdvancedCache<byte[], byte[]> localNonBlocking;
final boolean persistence;
final boolean indexing;
final boolean syncListener;

CacheInfo(AdvancedCache<byte[], byte[]> localNonBlocking, boolean persistence, boolean indexing) {
CacheInfo(AdvancedCache<byte[], byte[]> localNonBlocking, boolean persistence, boolean indexing,
boolean syncListener) {
this.localNonBlocking = localNonBlocking;
this.persistence = persistence;
this.indexing = indexing;
this.syncListener = syncListener;
}

AdvancedCache<byte[], byte[]> localNonBlocking(Subject subject) {
Expand Down

0 comments on commit 1b26b5a

Please sign in to comment.