New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ISPN-9589 Client Listener notifications can deadlock #6318
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
import org.infinispan.server.core.ServerConstants; | ||
import org.infinispan.server.hotrod.logging.Log; | ||
|
||
import io.netty.channel.Channel; | ||
import io.netty.channel.ChannelHandlerContext; | ||
import io.netty.handler.codec.ByteToMessageDecoder; | ||
import io.netty.handler.codec.DecoderException; | ||
|
@@ -62,11 +63,15 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { | |
@Override | ||
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { | ||
super.channelWritabilityChanged(ctx); | ||
Channel channel = ctx.channel(); | ||
boolean writeable = channel.isWritable(); | ||
if (trace) { | ||
log.tracef("Channel %s writability changed", ctx.channel()); | ||
log.tracef("Channel %s writability changed to %b", channel, writeable); | ||
} | ||
if (writeable) { | ||
server.getClientListenerRegistry().findAndWriteEvents(channel); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops, I've seen the method name at least 10 times and only now I see that the method is also called when the channel becomes non-writable :) But I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hrmm, it only invokes I didn't write this, but I don't see why we couldn't have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I was thinking about the iteration, which always has to happen in the event loop, whatever that means :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, so I finally started reading a bit about the Netty thread model and I saw my expectations were completely wrong. I didn't realize a channel was actually bound to a single thread, I thought the event loop was only bound to a thread pool and every event arrived on another thread in the pool, something like the 5.0 thread model, only with regular thread pools instead of |
||
server.getClientCounterNotificationManager().channelActive(channel); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should check that the channel is still writable after sending the events? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure what you mean here. Unfortunately, the channel can become non writeable for many different reasons, since it is shared for many different operations. The way it works currently is we submit to the event loop if it is writeable, there is no guarantee those events are written though as those event writes are queued. Another operation could fully exhaust the outbound buffer making it not writeable again before the events fire. Thus we need this callback to try to write those events again after it can be written again. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant it's much more likely that the channel is no longer active after we wrote some events than right after we got the notification that the channel is active. Unless There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if this is intermingled with the other comment chain now? But yes I agree that the channel may not be writeable most likely after exhausting all 100+ elements from the event queue. But I don't understand what change you are proposing here though. If the channel is no longer writeable after writing the events, we just wait until we get the callback again telling us the channel is writeable to continue writing more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I wrote this before I realized that |
||
} | ||
server.getClientListenerRegistry().findAndWriteEvents(ctx.channel()); | ||
server.getClientCounterNotificationManager().channelActive(ctx.channel()); | ||
} | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -51,9 +51,11 @@ | |||
import org.infinispan.notifications.cachelistener.filter.KeyValueFilterConverterAsCacheEventFilterConverter; | ||||
import org.infinispan.server.hotrod.logging.Log; | ||||
import org.infinispan.util.KeyValuePair; | ||||
import org.infinispan.util.concurrent.TimeoutException; | ||||
|
||||
import io.netty.buffer.ByteBuf; | ||||
import io.netty.channel.Channel; | ||||
import io.netty.channel.EventLoop; | ||||
|
||||
/** | ||||
* @author Galder Zamarreño | ||||
|
@@ -324,6 +326,7 @@ boolean hasChannel(Channel channel) { | |||
return ch == channel; | ||||
} | ||||
|
||||
// This method can only be invoked from the Event Loop thread! | ||||
void writeEventsIfPossible() { | ||||
boolean written = false; | ||||
while (!eventQueue.isEmpty() && ch.isWritable()) { | ||||
|
@@ -388,16 +391,37 @@ void sendEvent(byte[] key, byte[] value, long dataVersion, CacheEntryEvent event | |||
if (isTrace) | ||||
log.tracef("Queue event %s, before queuing event queue size is %d", remoteEvent, eventQueue.size()); | ||||
|
||||
boolean waitingForFlush = !ch.isWritable(); | ||||
try { | ||||
eventQueue.put(remoteEvent); | ||||
} catch (InterruptedException e) { | ||||
throw new CacheException(e); | ||||
EventLoop loop = ch.eventLoop(); | ||||
// We shouldn't be in the event loop, but just in case we can't get stuck blocking on putting into the queue | ||||
// so we offer and try to catch up on events if possible | ||||
if (loop.inEventLoop()) { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO we could just throw an exception instead of complicating the code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately we can't do that really. The problem is someone could add a sync listener after the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TBH I'm still a little confused why we would ever keep the event loop busy while we are running any cache operation. I mean that prevents any other thread from reading from the channel, so the server is going to spend a lot of time twiddling its thumbs unless there are lots of clients per server. I understand that we wouldn't want to incur a context switch to process short requests, but couldn't we allow another thread to start reading from/writing to the channel while we are doing some other work? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is just the way @rvansa wrote it when he changed it over to share the connection. I originally wrote it the way you said, except gets and ping were exceptions allowing reads on the io thread. Line 47 in 1283927
When things were more using the async stack was when Radim felt it would be better off allowing all operations that could be async to do this. This PR/bug is a side effect of missing a case :) I am not sure if what we have is better or worse atm than before (assuming no bugs with blocking). But maybe write operations are too big even when CPU only bound? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should trigger a perf ack job with and without passing requests to another thread pool again, but I'd argue that even the serialization of read results could be heavy enough to justify doing it on another thread. |
||||
boolean offered = eventQueue.offer(remoteEvent); | ||||
while (!offered) { | ||||
// If the event queue is full, we have to try to write some events to free up space since we are in the | ||||
// event loop and no other thread can drain this queue but us | ||||
writeEventsIfPossible(); | ||||
// We again try to offer, but if we weren't able to write any events this will not offer - so we | ||||
// have to wait for the client to catch up to us - we put a little wait to not cause CPU to spin | ||||
try { | ||||
offered = eventQueue.offer(remoteEvent, 1, TimeUnit.MILLISECONDS); | ||||
} catch (InterruptedException e) { | ||||
throw new CacheException(e); | ||||
} | ||||
} | ||||
} else { | ||||
try { | ||||
// TODO: replace with a better number | ||||
if (!eventQueue.offer(remoteEvent, 30, TimeUnit.SECONDS)) { | ||||
throw new TimeoutException("Timed out attempting to offer remote event into queue"); | ||||
} | ||||
} catch (InterruptedException e) { | ||||
throw new CacheException(e); | ||||
} | ||||
} | ||||
|
||||
if (!waitingForFlush) { | ||||
if (ch.isWritable()) { | ||||
// Make sure we write any event in main event loop | ||||
ch.eventLoop().submit(writeEventsIfPossible); | ||||
loop.submit(writeEventsIfPossible); | ||||
} | ||||
} | ||||
|
||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
import java.io.ObjectInput; | ||
import java.io.ObjectOutput; | ||
import java.io.Serializable; | ||
import java.lang.annotation.Annotation; | ||
import java.util.ArrayList; | ||
import java.util.EnumSet; | ||
import java.util.Iterator; | ||
|
@@ -19,7 +20,6 @@ | |
import java.util.ServiceLoader; | ||
import java.util.Set; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
|
@@ -67,6 +67,13 @@ | |
import org.infinispan.metadata.Metadata; | ||
import org.infinispan.multimap.impl.EmbeddedMultimapCache; | ||
import org.infinispan.notifications.Listener; | ||
import org.infinispan.notifications.cachelistener.CacheEntryListenerInvocation; | ||
import org.infinispan.notifications.cachelistener.CacheNotifier; | ||
import org.infinispan.notifications.cachelistener.CacheNotifierImpl; | ||
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated; | ||
import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired; | ||
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified; | ||
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved; | ||
import org.infinispan.notifications.cachelistener.annotation.TopologyChanged; | ||
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent; | ||
import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory; | ||
|
@@ -93,6 +100,8 @@ | |
import org.infinispan.upgrade.RollingUpgradeManager; | ||
import org.infinispan.util.KeyValuePair; | ||
|
||
import com.github.benmanes.caffeine.cache.Caffeine; | ||
|
||
import io.netty.channel.Channel; | ||
import io.netty.channel.ChannelInitializer; | ||
import io.netty.channel.ChannelOutboundHandler; | ||
|
@@ -117,13 +126,20 @@ public HotRodServer() { | |
super("HotRod"); | ||
} | ||
|
||
private static Map<String, CacheInfo> expirationMap(long time, TimeUnit unit) { | ||
com.github.benmanes.caffeine.cache.Cache<String, CacheInfo> cache = Caffeine.newBuilder() | ||
.expireAfterWrite(time, unit).build(); | ||
return cache.asMap(); | ||
} | ||
|
||
private Address clusterAddress; | ||
private ServerAddress address; | ||
private Cache<Address, ServerAddress> addressCache; | ||
private Map<String, AdvancedCache> knownCaches = CollectionFactory.makeConcurrentMap(4, 0.9f, 16); | ||
private Map<String, Configuration> knownCacheConfigurations = CollectionFactory.makeConcurrentMap(4, 0.9f, 16); | ||
private Map<String, ComponentRegistry> knownCacheRegistries = CollectionFactory.makeConcurrentMap(4, 0.9f, 16); | ||
private final Map<String, CacheInfo> cacheInfo = new ConcurrentHashMap<>(); | ||
// This needs to be an expiration map so we can update knowledge about if sync listeners are registered | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you need expiration exactly? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the reason for expiration is due to the fact that this There may be a way to plug into listener installation to see if one is sync, but I have my doubts about that working. And as I mentioned in my below comment, when I refactor listeners for 10.0 this is no longer needed and can be reverted to a regular |
||
private final Map<String, CacheInfo> cacheInfo = expirationMap(10, TimeUnit.SECONDS); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like this approach, it means we could have event loop threads processing write requests for 10s after the listener is added. Since we don't really care about all cache listeners, only about client listeners, and only on the originator, we could intercept client listener add requests here and update the map synchronously. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hrmm, we care about all sync listeners, doesn't matter where they come from. Otherwise we would be blocking the io event loop for user listeners, cluster listeners or others as well. The two types mentioned are added outside of this class. I will see if I can do something else though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure of an easy way to do this without changing a bunch of stuff. Given that this is temporary anyways, I would rather just touch as little as possible for now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking this is the server, so we shouldn't have any sync or async listeners except for the client listeners, but I guess it's fine if you add a comment that it's all going away in 10 so I can hold you to it :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A client listener is registered as a sync cluster listener on other nodes, which is the big case this misses. Technically this shouldn't be an issue as you mentioned as it should be the primary owner, but sometimes it may not :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking that even on the primary owner, the client listener is triggered only after the backups ack the write, on a remote thread. But your approach is definitely safer, the primary could temporarily be the only owner, and we don't want to make too many assumptions about how distribution works here in the server. |
||
private QueryFacade queryFacade; | ||
private Map<String, SaslServerFactory> saslMechFactories = CollectionFactory.makeConcurrentMap(4, 0.9f, 16); | ||
private ClientListenerRegistry clientListenerRegistry; | ||
|
@@ -424,6 +440,18 @@ public void cacheStopped(String cacheName) { | |
cacheInfo.keySet().stream().filter(k -> k.startsWith(cacheName)).forEach(cacheInfo::remove); | ||
} | ||
|
||
boolean hasSyncListener(CacheNotifierImpl<?, ?> cacheNotifier) { | ||
for (Class<? extends Annotation> annotation : | ||
new Class[]{CacheEntryCreated.class, CacheEntryRemoved.class, CacheEntryExpired.class, CacheEntryModified.class}) { | ||
for (CacheEntryListenerInvocation invocation : cacheNotifier.getListenerCollectionForAnnotation(annotation)) { | ||
if (invocation.isSync()) { | ||
return true; | ||
} | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
public CacheInfo getCacheInfo(AdvancedCache<byte[], byte[]> cache, HotRodHeader header) { | ||
// Fetching persistence manager would require security action, and would be too expensive | ||
CacheInfo info = cacheInfo.get(cache.getName() + header.getKeyMediaType().getTypeSubtype() + header.getValueMediaType().getTypeSubtype()); | ||
|
@@ -432,12 +460,14 @@ public CacheInfo getCacheInfo(AdvancedCache<byte[], byte[]> cache, HotRodHeader | |
.noFlags().withFlags(LOCAL_NON_BLOCKING_GET); | ||
if (cache.getStatus() != ComponentStatus.RUNNING) { | ||
// stay on the safe side | ||
return new CacheInfo(localNonBlocking, true, true); | ||
return new CacheInfo(localNonBlocking, true, true, true); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder which parameter the comment above refers to... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. :) I was assuming it was all of them - since the optimizations only fire if the values are false. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this check is needed, we can't get a reference to a cache before it's started and if it's stopping/stopped we'll throw an exception immediately. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have no idea personally, just how it was added by @rvansa. He would know more about why this check is present. But I would be up for removing, however I wouldn't want to do it in this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not? 😃 |
||
} | ||
ComponentRegistry cr = SecurityActions.getCacheComponentRegistry(cache); | ||
|
||
PersistenceManager pm = cr.getComponent(PersistenceManager.class); | ||
boolean hasIndexing = SecurityActions.getCacheConfiguration(cache).indexing().index().isEnabled(); | ||
info = new CacheInfo(localNonBlocking, pm.isEnabled(), hasIndexing); | ||
CacheNotifierImpl cacheNotifier = (CacheNotifierImpl) cr.getComponent(CacheNotifier.class); | ||
info = new CacheInfo(localNonBlocking, pm.isEnabled(), hasIndexing, hasSyncListener(cacheNotifier)); | ||
cacheInfo.put(cache.getName() + header.getKeyMediaType().getTypeSubtype() + header.getValueMediaType().getTypeSubtype(), info); | ||
} | ||
return info; | ||
|
@@ -668,11 +698,14 @@ public static class CacheInfo { | |
final AdvancedCache<byte[], byte[]> localNonBlocking; | ||
final boolean persistence; | ||
final boolean indexing; | ||
final boolean syncListener; | ||
|
||
CacheInfo(AdvancedCache<byte[], byte[]> localNonBlocking, boolean persistence, boolean indexing) { | ||
CacheInfo(AdvancedCache<byte[], byte[]> localNonBlocking, boolean persistence, boolean indexing, | ||
boolean syncListener) { | ||
this.localNonBlocking = localNonBlocking; | ||
this.persistence = persistence; | ||
this.indexing = indexing; | ||
this.syncListener = syncListener; | ||
} | ||
|
||
AdvancedCache<byte[], byte[]> localNonBlocking(Subject subject) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like you're fixing ISPN-7087 indirectly here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let me make a separate commit for it.