Skip to content

Commit

Permalink
ISPN-9589 Client Listener notifications can deadlock
Browse files Browse the repository at this point in the history
* Sync listeners can block as well - fire on executor
  • Loading branch information
wburns committed Oct 10, 2018
1 parent e6a3142 commit 6263fe8
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 18 deletions.
Expand Up @@ -5,12 +5,14 @@
import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
import org.infinispan.client.hotrod.annotation.ClientCacheFailover;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.configuration.NearCacheConfiguration;
import org.infinispan.client.hotrod.event.ClientCacheEntryCustomEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryExpiredEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
import org.infinispan.client.hotrod.event.ClientCacheFailoverEvent;
Expand Down Expand Up @@ -170,6 +172,12 @@ public void handleRemovedEvent(ClientCacheEntryRemovedEvent<K> event) {
invalidate(event.getKey());
}

@ClientCacheEntryExpired
@SuppressWarnings("unused")
public void handleExpiredEvent(ClientCacheEntryExpiredEvent<K> event) {
invalidate(event.getKey());
}

@ClientCacheFailover
@SuppressWarnings("unused")
public void handleFailover(ClientCacheFailoverEvent e) {
Expand Down Expand Up @@ -228,6 +236,7 @@ private <T> T unmarshallObject(byte[] bytes, String element) {
}

@ClientCacheEntryRemoved
@ClientCacheEntryExpired
@SuppressWarnings("unused")
public void handleRemovedEvent(ClientCacheEntryCustomEvent<byte[]> e) {
ByteBuffer in = ByteBuffer.wrap(e.getEventData());
Expand Down
Expand Up @@ -16,6 +16,7 @@
import org.infinispan.server.core.ServerConstants;
import org.infinispan.server.hotrod.logging.Log;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.DecoderException;
Expand Down Expand Up @@ -62,11 +63,15 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
Channel channel = ctx.channel();
boolean writeable = channel.isWritable();
if (trace) {
log.tracef("Channel %s writability changed", ctx.channel());
log.tracef("Channel %s writability changed to %b", channel, writeable);
}
if (writeable) {
server.getClientListenerRegistry().findAndWriteEvents(channel);
server.getClientCounterNotificationManager().channelActive(channel);
}
server.getClientListenerRegistry().findAndWriteEvents(ctx.channel());
server.getClientCounterNotificationManager().channelActive(ctx.channel());
}

@Override
Expand Down
Expand Up @@ -51,7 +51,7 @@ private boolean isBlockingRead(CacheInfo info, HotRodHeader header) {
private boolean isBlockingWrite(AdvancedCache<byte[], byte[]> cache, HotRodHeader header) {
CacheInfo info = server.getCacheInfo(cache, header);
// Note: cache store cannot be skipped (yet)
return info.persistence || info.indexing && !header.isSkipIndexing();
return info.persistence || info.indexing && !header.isSkipIndexing() || info.syncListener;
}

void ping(HotRodHeader header, Subject subject) {
Expand Down
Expand Up @@ -54,6 +54,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;

/**
* @author Galder Zamarreño
Expand Down Expand Up @@ -324,6 +325,7 @@ boolean hasChannel(Channel channel) {
return ch == channel;
}

// This method can only be invoked from the Event Loop thread!
void writeEventsIfPossible() {
boolean written = false;
while (!eventQueue.isEmpty() && ch.isWritable()) {
Expand Down Expand Up @@ -388,16 +390,38 @@ void sendEvent(byte[] key, byte[] value, long dataVersion, CacheEntryEvent event
if (isTrace)
log.tracef("Queue event %s, before queuing event queue size is %d", remoteEvent, eventQueue.size());

boolean waitingForFlush = !ch.isWritable();
try {
eventQueue.put(remoteEvent);
} catch (InterruptedException e) {
throw new CacheException(e);
}
EventLoop loop = ch.eventLoop();
// We shouldn't be in the event loop, but just in case we can't get stuck blocking on putting into the queue
// so we offer and try to catch up on events if possible
if (loop.inEventLoop()) {
boolean offered = eventQueue.offer(remoteEvent);
while (!offered) {
// If the event queue is full, we have to try to write some events to free up space since we are in the
// event loop and no other thread can drain this queue but us
writeEventsIfPossible();
// We again try to offer, but if we weren't able to write any events this will not offer - so we
// have to wait for the client to catch up to us - we put a little wait to not cause CPU to spin
try {
offered = eventQueue.offer(remoteEvent, 1, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new CacheException(e);
}
}
if (ch.isWritable()) {
// Now that the entry is finally in eventQueue, submit the events to be written at a later point
loop.submit(writeEventsIfPossible);
}
} else {
try {
eventQueue.put(remoteEvent);
} catch (InterruptedException e) {
throw new CacheException(e);
}

if (!waitingForFlush) {
// Make sure we write any event in main event loop
ch.eventLoop().submit(writeEventsIfPossible);
if (ch.isWritable()) {
// Make sure we write any event in main event loop
loop.submit(writeEventsIfPossible);
}
}
}

Expand Down
Expand Up @@ -10,6 +10,7 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
Expand All @@ -19,7 +20,6 @@
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -67,6 +67,13 @@
import org.infinispan.metadata.Metadata;
import org.infinispan.multimap.impl.EmbeddedMultimapCache;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.CacheEntryListenerInvocation;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.notifications.cachelistener.CacheNotifierImpl;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.annotation.TopologyChanged;
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
Expand All @@ -93,6 +100,8 @@
import org.infinispan.upgrade.RollingUpgradeManager;
import org.infinispan.util.KeyValuePair;

import com.github.benmanes.caffeine.cache.Caffeine;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandler;
Expand All @@ -117,13 +126,20 @@ public HotRodServer() {
super("HotRod");
}

private static Map<String, CacheInfo> expirationMap(long time, TimeUnit unit) {
com.github.benmanes.caffeine.cache.Cache<String, CacheInfo> cache = Caffeine.newBuilder()
.expireAfterWrite(time, unit).build();
return cache.asMap();
}

private Address clusterAddress;
private ServerAddress address;
private Cache<Address, ServerAddress> addressCache;
private Map<String, AdvancedCache> knownCaches = CollectionFactory.makeConcurrentMap(4, 0.9f, 16);
private Map<String, Configuration> knownCacheConfigurations = CollectionFactory.makeConcurrentMap(4, 0.9f, 16);
private Map<String, ComponentRegistry> knownCacheRegistries = CollectionFactory.makeConcurrentMap(4, 0.9f, 16);
private final Map<String, CacheInfo> cacheInfo = new ConcurrentHashMap<>();
// This needs to be an expiration map so we can update knowledge about if sync listeners are registered
private final Map<String, CacheInfo> cacheInfo = expirationMap(10, TimeUnit.SECONDS);
private QueryFacade queryFacade;
private Map<String, SaslServerFactory> saslMechFactories = CollectionFactory.makeConcurrentMap(4, 0.9f, 16);
private ClientListenerRegistry clientListenerRegistry;
Expand Down Expand Up @@ -424,6 +440,18 @@ public void cacheStopped(String cacheName) {
cacheInfo.keySet().stream().filter(k -> k.startsWith(cacheName)).forEach(cacheInfo::remove);
}

boolean hasSyncListener(CacheNotifierImpl<?, ?> cacheNotifier) {
for (Class<? extends Annotation> annotation :
new Class[]{CacheEntryCreated.class, CacheEntryRemoved.class, CacheEntryExpired.class, CacheEntryModified.class}) {
for (CacheEntryListenerInvocation invocation : cacheNotifier.getListenerCollectionForAnnotation(annotation)) {
if (invocation.isSync()) {
return true;
}
}
}
return false;
}

public CacheInfo getCacheInfo(AdvancedCache<byte[], byte[]> cache, HotRodHeader header) {
// Fetching persistence manager would require security action, and would be too expensive
CacheInfo info = cacheInfo.get(cache.getName() + header.getKeyMediaType().getTypeSubtype() + header.getValueMediaType().getTypeSubtype());
Expand All @@ -432,12 +460,14 @@ public CacheInfo getCacheInfo(AdvancedCache<byte[], byte[]> cache, HotRodHeader
.noFlags().withFlags(LOCAL_NON_BLOCKING_GET);
if (cache.getStatus() != ComponentStatus.RUNNING) {
// stay on the safe side
return new CacheInfo(localNonBlocking, true, true);
return new CacheInfo(localNonBlocking, true, true, true);
}
ComponentRegistry cr = SecurityActions.getCacheComponentRegistry(cache);

PersistenceManager pm = cr.getComponent(PersistenceManager.class);
boolean hasIndexing = SecurityActions.getCacheConfiguration(cache).indexing().index().isEnabled();
info = new CacheInfo(localNonBlocking, pm.isEnabled(), hasIndexing);
CacheNotifierImpl cacheNotifier = (CacheNotifierImpl) cr.getComponent(CacheNotifier.class);
info = new CacheInfo(localNonBlocking, pm.isEnabled(), hasIndexing, hasSyncListener(cacheNotifier));
cacheInfo.put(cache.getName() + header.getKeyMediaType().getTypeSubtype() + header.getValueMediaType().getTypeSubtype(), info);
}
return info;
Expand Down Expand Up @@ -668,11 +698,14 @@ public static class CacheInfo {
final AdvancedCache<byte[], byte[]> localNonBlocking;
final boolean persistence;
final boolean indexing;
final boolean syncListener;

CacheInfo(AdvancedCache<byte[], byte[]> localNonBlocking, boolean persistence, boolean indexing) {
CacheInfo(AdvancedCache<byte[], byte[]> localNonBlocking, boolean persistence, boolean indexing,
boolean syncListener) {
this.localNonBlocking = localNonBlocking;
this.persistence = persistence;
this.indexing = indexing;
this.syncListener = syncListener;
}

AdvancedCache<byte[], byte[]> localNonBlocking(Subject subject) {
Expand Down

0 comments on commit 6263fe8

Please sign in to comment.