diff --git a/core/src/main/java/org/infinispan/cache/impl/DecoratedCache.java b/core/src/main/java/org/infinispan/cache/impl/DecoratedCache.java index 53fc5b4fe52d..4c90202974ff 100644 --- a/core/src/main/java/org/infinispan/cache/impl/DecoratedCache.java +++ b/core/src/main/java/org/infinispan/cache/impl/DecoratedCache.java @@ -8,6 +8,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.infinispan.AdvancedCache; diff --git a/core/src/main/java/org/infinispan/commands/read/GetAllCommand.java b/core/src/main/java/org/infinispan/commands/read/GetAllCommand.java index 979ca7d221b1..1d908ec7a869 100644 --- a/core/src/main/java/org/infinispan/commands/read/GetAllCommand.java +++ b/core/src/main/java/org/infinispan/commands/read/GetAllCommand.java @@ -78,8 +78,11 @@ public Object perform(InvocationContext ctx) throws Throwable { if (entry == null) { if (trace) { log.tracef("Entry for key %s not found", key); - if (ctx.isOriginLocal()) - throw new IllegalStateException("All entries must exist in the context"); + } + // We have to put null even if it isn't in the context. This is because + // context won't have a value for null unless it is repeatable read. + if (ch == null || ch.isKeyLocalToNode(localAddress, key)) { + map.put(key, null); } continue; } diff --git a/core/src/main/java/org/infinispan/interceptors/compat/BaseTypeConverterInterceptor.java b/core/src/main/java/org/infinispan/interceptors/compat/BaseTypeConverterInterceptor.java index 0cb6b5973a96..f745eaafb966 100644 --- a/core/src/main/java/org/infinispan/interceptors/compat/BaseTypeConverterInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/compat/BaseTypeConverterInterceptor.java @@ -23,7 +23,9 @@ import org.infinispan.iteration.EntryIterable; import org.infinispan.metadata.Metadata; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Set; diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/NonTxDistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/distribution/NonTxDistributionInterceptor.java index a4dab01df828..ab4a86858fef 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/NonTxDistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/NonTxDistributionInterceptor.java @@ -102,63 +102,85 @@ private Object computeGetReturn(InternalCacheEntry entry, boolean returnEntry) { public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) throws Throwable { if (command.hasFlag(Flag.CACHE_MODE_LOCAL) || command.hasFlag(Flag.SKIP_REMOTE_LOOKUP) - || command.hasFlag(Flag.IGNORE_RETURN_VALUES) - || !ctx.isOriginLocal()) { + || command.hasFlag(Flag.IGNORE_RETURN_VALUES)) { return invokeNextInterceptor(ctx, command); } int commandTopologyId = command.getTopologyId(); - int currentTopologyId = stateTransferManager.getCacheTopology().getTopologyId(); - boolean topologyChanged = currentTopologyId != commandTopologyId && commandTopologyId != -1; - log.tracef("Command topology id is %d, current topology id is %d", commandTopologyId, currentTopologyId); - if (topologyChanged) { - throw new OutdatedTopologyException("Cache topology changed while the command was executing: expected " + - commandTopologyId + ", got " + currentTopologyId); - } - - // At this point, we know that an entry located on this node that exists in the data container/store - // must also exist in the context. - ConsistentHash ch = command.getConsistentHash(); - Set requestedKeys = new HashSet<>(); - for (Object key : command.getKeys()) { - CacheEntry entry = ctx.lookupEntry(key); - if (entry == null || entry.isNull()) { - if (!isValueAvailableLocally(ch, key)) { - requestedKeys.add(key); - } else { - if (trace) { - log.tracef("Not doing a remote get for missing key %s since entry is " - + "mapped to current node (%s). Owners are %s", - toStr(key), rpcManager.getAddress(), ch.locateOwners(key)); + if (ctx.isOriginLocal()) { + int currentTopologyId = stateTransferManager.getCacheTopology().getTopologyId(); + boolean topologyChanged = currentTopologyId != commandTopologyId && commandTopologyId != -1; + if (trace) { + log.tracef("Command topology id is %d, current topology id is %d", commandTopologyId, currentTopologyId); + } + if (topologyChanged) { + throw new OutdatedTopologyException("Cache topology changed while the command was executing: expected " + + commandTopologyId + ", got " + currentTopologyId); + } + + // At this point, we know that an entry located on this node that exists in the data container/store + // must also exist in the context. + ConsistentHash ch = command.getConsistentHash(); + Set requestedKeys = new HashSet<>(); + for (Object key : command.getKeys()) { + CacheEntry entry = ctx.lookupEntry(key); + if (entry == null || entry.isNull()) { + if (!isValueAvailableLocally(ch, key)) { + requestedKeys.add(key); + } else { + if (trace) { + log.tracef("Not doing a remote get for missing key %s since entry is " + + "mapped to current node (%s). Owners are %s", + toStr(key), rpcManager.getAddress(), ch.locateOwners(key)); + } + // forWrite=true forces a non-null entry to be created, because we know this entry is local + wrapInternalCacheEntry(null, ctx, key, true, command); } - // forWrite=true forces a non-null entry to be created, because we know this entry is local - wrapInternalCacheEntry(null, ctx, key, true, command); } } - } - - boolean missingRemoteValues = false; - if (!requestedKeys.isEmpty()) { - if (trace) { - log.tracef("Fetching entries for keys %s from remote nodes", requestedKeys); + + boolean missingRemoteValues = false; + if (!requestedKeys.isEmpty()) { + if (trace) { + log.tracef("Fetching entries for keys %s from remote nodes", requestedKeys); + } + + Map justRetrieved = retrieveFromRemoteSources( + requestedKeys, ctx, command.getFlags()); + for (Object key : requestedKeys) { + if (!justRetrieved.containsKey(key)) { + missingRemoteValues = true; + } else { + wrapInternalCacheEntry(justRetrieved.get(key), ctx, key, true, command); + } + } } - - Map justRetrieved = retrieveFromRemoteSources( - requestedKeys, ctx, command.getFlags()); - for (Object key : requestedKeys) { - if (!justRetrieved.containsKey(key)) { - missingRemoteValues = true; - } else { - wrapInternalCacheEntry(justRetrieved.get(key), ctx, key, true, command); + + if (missingRemoteValues) { + throw new OutdatedTopologyException("Remote values are missing because of a topology change"); + } + return invokeNextInterceptor(ctx, command); + } else { + Map values = (Map) invokeNextInterceptor(ctx, command); + int currentTopologyId = stateTransferManager.getCacheTopology().getTopologyId(); + boolean topologyChanged = currentTopologyId != commandTopologyId && commandTopologyId != -1; + // If the topology changed while invoking, this means we could have a null value + // but there really wasn't so we have to suspect that entry. + if (topologyChanged) { + if (trace) { + log.tracef("Command topology id is %d, after topology id is %d", + commandTopologyId, currentTopologyId); + } + Iterator> valueIterator = values.entrySet().iterator(); + while (valueIterator.hasNext()) { + Entry entry = valueIterator.next(); + if (entry.getValue() == null) { + valueIterator.remove(); + } } } + return values; } - - if (missingRemoteValues) { - throw new OutdatedTopologyException("Remote values are missing because of a topology change"); - } - - return invokeNextInterceptor(ctx, command); } @Override diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/TxDistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/distribution/TxDistributionInterceptor.java index 474ff03976ab..6bff5b07b92c 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/TxDistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/TxDistributionInterceptor.java @@ -38,8 +38,12 @@ import org.infinispan.util.logging.LogFactory; import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.TimeoutException; import static org.infinispan.commons.util.Util.toStr; @@ -162,75 +166,111 @@ private Object visitGetCommand(InvocationContext ctx, AbstractDataCommand comman @Override public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) throws Throwable { Map map = (Map) invokeNextInterceptor(ctx, command); - if (map == null) map = command.createMap(); - if (!ctx.isOriginLocal() || command.hasFlag(Flag.CACHE_MODE_LOCAL) + int commandTopologyId = command.getTopologyId(); + int currentTopologyId = stateTransferManager.getCacheTopology().getTopologyId(); + boolean topologyChanged = currentTopologyId != commandTopologyId && commandTopologyId != -1; + // If the topology changed while invoking, this means we could have a null value + // but there really wasn't so we have to suspect that entry. + if (topologyChanged) { + if (trace) { + log.tracef("Command topology id is %d, after topology id is %d", + commandTopologyId, currentTopologyId); + } + Iterator> valueIterator = map.entrySet().iterator(); + while (valueIterator.hasNext()) { + Entry entry = valueIterator.next(); + if (entry.getValue() == null) { + valueIterator.remove(); + } + } + } + if (command.hasFlag(Flag.CACHE_MODE_LOCAL) || command.hasFlag(Flag.SKIP_REMOTE_LOOKUP) - || command.hasFlag(Flag.IGNORE_RETURN_VALUES)) { + || command.hasFlag(Flag.IGNORE_RETURN_VALUES) + || !ctx.isOriginLocal()) { return map; } + boolean missingRemoteValues = false; Set requestedKeys = new HashSet<>(command.getKeys().size()); for (Object key : command.getKeys()) { - //if the cache entry has the value lock flag set, skip the remote get. - CacheEntry entry = ctx.lookupEntry(key); - boolean skipRemoteGet = entry != null && entry.skipLookup(); - - // need to check in the context as well since a null retval is not necessarily an indication of the entry not being - // available. It could just have been removed in the same tx beforehand. Also don't bother with a remote get if - // the entry is mapped to the local node. - if (!skipRemoteGet && map.get(key) == null) { - // TODO: what about the deltaCompositeKey? It's kind of messy, where should we use the composite key - // and where the regular one? - //key = filterDeltaCompositeKey(key); - boolean shouldFetchFromRemote = false; - if (entry == null || entry.isNull()) { - ConsistentHash ch = stateTransferManager.getCacheTopology().getReadConsistentHash(); - shouldFetchFromRemote = !isValueAvailableLocally(ch, key); - if (!shouldFetchFromRemote && getLog().isTraceEnabled()) { - getLog().tracef("Not doing a remote get for key %s since entry is mapped to current node (%s) or is in L1. Owners are %s", toStr(key), rpcManager.getAddress(), ch.locateOwners(key)); + // If the local map already found the key that means we don't need to go + // remote for it + if (!map.containsKey(key)) { + //if the cache entry has the value lock flag set, skip the remote get. + CacheEntry entry = ctx.lookupEntry(key); + boolean skipRemoteGet = entry != null && entry.skipLookup(); + + // need to check in the context as well since a null retval is not necessarily an indication of the entry not being + // available. It could just have been removed in the same tx beforehand. Also don't bother with a remote get if + // the entry is mapped to the local node. + if (!skipRemoteGet) { + // TODO: what about the deltaCompositeKey? It's kind of messy, where should we use the composite key + // and where the regular one? + //key = filterDeltaCompositeKey(key); + boolean shouldFetchFromRemote = false; + if (entry == null || entry.isNull()) { + ConsistentHash ch = stateTransferManager.getCacheTopology().getReadConsistentHash(); + shouldFetchFromRemote = !isValueAvailableLocally(ch, key); + if (!shouldFetchFromRemote && trace) { + getLog().tracef("Not performing remote lookup of key %s as we own it" + + " now, we didn't when we looked - will have to retry command after", key); + } } - } - if (shouldFetchFromRemote) { - requestedKeys.add(key); - } else if (!ctx.isEntryRemovedInContext(key)) { - // Try again locally in case if we now are an owner for a key - Object localValue = localGet(ctx, key, false, command, command.isReturnEntries()); - if (localValue != null) { - map.put(key, localValue); + if (shouldFetchFromRemote) { + requestedKeys.add(key); + } else if (!ctx.isEntryRemovedInContext(key)) { + // Try again locally in case if we now are an owner for a key + Object localValue = localGet(ctx, key, false, command, command.isReturnEntries()); + if (localValue != null) { + map.put(key, localValue); + } else { + // Just in case rerun it again + missingRemoteValues = true; + } } } } } + if (!requestedKeys.isEmpty()) { if (trace) { log.tracef("Fetching entries for keys %s from remote nodes", requestedKeys); } - Map previouslyRetrieved = command.getRemotelyFetched(); + Map justRetrieved = retrieveFromRemoteSources( requestedKeys, ctx, command.getFlags()); + Map previouslyRetrieved = command.getRemotelyFetched(); if (previouslyRetrieved != null) { previouslyRetrieved.putAll(justRetrieved); } else { command.setRemotelyFetched(justRetrieved); } - for (Entry entry : justRetrieved.entrySet()) { - Object key = entry.getKey(); - InternalCacheEntry value = entry.getValue(); - map.put(entry.getKey(), command.isReturnEntries() ? value : value != null ? value.getValue() : null); - if (useClusteredWriteSkewCheck && ctx.isInTxScope()) { - ((TxInvocationContext)ctx).getCacheTransaction().putLookedUpRemoteVersion( - key, value.getMetadata().version()); - } - - if (!ctx.replaceValue(key, value)) { - ctx.putLookedUpEntry(key, value); - if (ctx.isInTxScope()) { - ((TxInvocationContext) ctx).getCacheTransaction().replaceVersionRead( + for (Object key : requestedKeys) { + if (!justRetrieved.containsKey(key)) { + missingRemoteValues = true; + } else { + InternalCacheEntry value = justRetrieved.get(key); + if (useClusteredWriteSkewCheck && ctx.isInTxScope()) { + ((TxInvocationContext)ctx).getCacheTransaction().putLookedUpRemoteVersion( key, value.getMetadata().version()); } + + if (!ctx.replaceValue(key, value)) { + ctx.putLookedUpEntry(key, value); + if (ctx.isInTxScope()) { + ((TxInvocationContext) ctx).getCacheTransaction().replaceVersionRead( + key, value.getMetadata().version()); + } + } + map.put(key, command.isReturnEntries() ? value : value != null ? value.getValue() : null); } } } + + if (missingRemoteValues) { + throw new OutdatedTopologyException("Remote values are missing because of a topology change"); + } return map; } diff --git a/core/src/main/java/org/infinispan/interceptors/locking/NonTransactionalLockingInterceptor.java b/core/src/main/java/org/infinispan/interceptors/locking/NonTransactionalLockingInterceptor.java index 7556676d182a..70b27180299f 100644 --- a/core/src/main/java/org/infinispan/interceptors/locking/NonTransactionalLockingInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/locking/NonTransactionalLockingInterceptor.java @@ -2,6 +2,7 @@ import org.infinispan.InvalidCacheUsageException; import org.infinispan.commands.DataCommand; +import org.infinispan.commands.read.GetAllCommand; import org.infinispan.commands.write.DataWriteCommand; import org.infinispan.commands.write.PutMapCommand; import org.infinispan.context.InvocationContext; @@ -40,7 +41,7 @@ protected Object visitDataWriteCommand(InvocationContext ctx, DataWriteCommand c } @Override - public Object visitGetManyCommand(InvocationContext ctx, GetManyCommand command) throws Throwable { + public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) throws Throwable { assertNonTransactional(ctx); try { return invokeNextInterceptor(ctx, command); diff --git a/core/src/main/java/org/infinispan/interceptors/locking/PessimisticLockingInterceptor.java b/core/src/main/java/org/infinispan/interceptors/locking/PessimisticLockingInterceptor.java index 67938605e75d..9be22cc023a5 100644 --- a/core/src/main/java/org/infinispan/interceptors/locking/PessimisticLockingInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/locking/PessimisticLockingInterceptor.java @@ -4,9 +4,6 @@ import org.infinispan.commands.DataCommand; import org.infinispan.commands.FlagAffectedCommand; import org.infinispan.commands.control.LockControlCommand; -import org.infinispan.commands.read.AbstractDataCommand; -import org.infinispan.commands.read.GetCacheEntryCommand; -import org.infinispan.commands.read.GetKeyValueCommand; import org.infinispan.commands.read.GetAllCommand; import org.infinispan.commands.remote.recovery.TxCompletionNotificationCommand; import org.infinispan.commands.tx.PrepareCommand; @@ -24,8 +21,8 @@ import org.infinispan.util.logging.LogFactory; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; -import java.util.Set; /** * Locking interceptor to be used by pessimistic caches. diff --git a/core/src/main/java/org/infinispan/partitionhandling/impl/PartitionHandlingInterceptor.java b/core/src/main/java/org/infinispan/partitionhandling/impl/PartitionHandlingInterceptor.java index aeee03e0c068..25f98c1bc0df 100644 --- a/core/src/main/java/org/infinispan/partitionhandling/impl/PartitionHandlingInterceptor.java +++ b/core/src/main/java/org/infinispan/partitionhandling/impl/PartitionHandlingInterceptor.java @@ -1,5 +1,10 @@ package org.infinispan.partitionhandling.impl; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + import org.infinispan.commands.LocalFlagAffectedCommand; import org.infinispan.commands.read.AbstractDataCommand; import org.infinispan.commands.read.EntryRetrievalCommand; diff --git a/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java b/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java index d8383f314af1..562d80eaf7d8 100644 --- a/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java +++ b/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java @@ -554,7 +554,7 @@ private static RspList processCalls(Map comm } // a get() on each future will block till that call completes. - TimeService timeService = card.gcr.getTimeService(); + TimeService timeService = card.timeService; long waitTime = timeService.expectedEndTime(timeout, MILLISECONDS); for (Map.Entry> entry : futures.entrySet()) { Address target = entry.getKey(); diff --git a/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java b/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java index 68c0d97ed55d..0fa6d1170272 100644 --- a/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java +++ b/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java @@ -62,6 +62,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; diff --git a/core/src/main/java/org/infinispan/util/logging/Log.java b/core/src/main/java/org/infinispan/util/logging/Log.java index 3f42dea4f13d..5770f30d1bd6 100644 --- a/core/src/main/java/org/infinispan/util/logging/Log.java +++ b/core/src/main/java/org/infinispan/util/logging/Log.java @@ -1309,5 +1309,5 @@ void asyncStoreShutdownTimeoutTooHigh(long configuredAsyncStopTimeout, void xaResourceEndFailed(XAResource resource, @Cause Throwable t); @Message(value = "Keys '%s' are not available. Not all owners are in this partition", id = 358) - AvailabilityException degradedModeKeysUnavailable(Collection keys); + AvailabilityException degradedModeKeysUnavailable(Collection keys); } diff --git a/core/src/test/java/org/infinispan/commands/GetAllCommandStressTest.java b/core/src/test/java/org/infinispan/commands/GetAllCommandStressTest.java index e4f7d101e5e8..e60f9c8e39cf 100644 --- a/core/src/test/java/org/infinispan/commands/GetAllCommandStressTest.java +++ b/core/src/test/java/org/infinispan/commands/GetAllCommandStressTest.java @@ -77,7 +77,6 @@ protected EmbeddedCacheManager addClusterEnabledCacheManager(TransportFlags flag return cm; } - @Test(invocationCount=100) public void testStressNodesLeavingWhileMultipleIterators() throws InterruptedException, ExecutionException, TimeoutException { final Map masterValues = new HashMap(); final Set[] keys = new Set[THREAD_WORKER_COUNT];