Skip to content

Commit

Permalink
ISPN-2183 ISPN-2183 Add the ability to fetch a set of keys at once
Browse files Browse the repository at this point in the history
(getAll)

* Fixed compile issue after merge and also updated stress test
* Also added back in support for values not looked up in context
* Also added changes for tx cache getAll
  • Loading branch information
wburns authored and danberindei committed Apr 17, 2015
1 parent 633ab3e commit 0a43e8f
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 98 deletions.
Expand Up @@ -8,6 +8,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.infinispan.AdvancedCache;
Expand Down
Expand Up @@ -78,8 +78,11 @@ public Object perform(InvocationContext ctx) throws Throwable {
if (entry == null) {
if (trace) {
log.tracef("Entry for key %s not found", key);
if (ctx.isOriginLocal())
throw new IllegalStateException("All entries must exist in the context");
}
// We have to put null even if it isn't in the context. This is because
// context won't have a value for null unless it is repeatable read.
if (ch == null || ch.isKeyLocalToNode(localAddress, key)) {
map.put(key, null);
}
continue;
}
Expand Down
Expand Up @@ -23,7 +23,9 @@
import org.infinispan.iteration.EntryIterable;
import org.infinispan.metadata.Metadata;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
Expand Down
Expand Up @@ -102,63 +102,85 @@ private Object computeGetReturn(InternalCacheEntry entry, boolean returnEntry) {
public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) throws Throwable {
if (command.hasFlag(Flag.CACHE_MODE_LOCAL)
|| command.hasFlag(Flag.SKIP_REMOTE_LOOKUP)
|| command.hasFlag(Flag.IGNORE_RETURN_VALUES)
|| !ctx.isOriginLocal()) {
|| command.hasFlag(Flag.IGNORE_RETURN_VALUES)) {
return invokeNextInterceptor(ctx, command);
}

int commandTopologyId = command.getTopologyId();
int currentTopologyId = stateTransferManager.getCacheTopology().getTopologyId();
boolean topologyChanged = currentTopologyId != commandTopologyId && commandTopologyId != -1;
log.tracef("Command topology id is %d, current topology id is %d", commandTopologyId, currentTopologyId);
if (topologyChanged) {
throw new OutdatedTopologyException("Cache topology changed while the command was executing: expected " +
commandTopologyId + ", got " + currentTopologyId);
}

// At this point, we know that an entry located on this node that exists in the data container/store
// must also exist in the context.
ConsistentHash ch = command.getConsistentHash();
Set<Object> requestedKeys = new HashSet<>();
for (Object key : command.getKeys()) {
CacheEntry entry = ctx.lookupEntry(key);
if (entry == null || entry.isNull()) {
if (!isValueAvailableLocally(ch, key)) {
requestedKeys.add(key);
} else {
if (trace) {
log.tracef("Not doing a remote get for missing key %s since entry is "
+ "mapped to current node (%s). Owners are %s",
toStr(key), rpcManager.getAddress(), ch.locateOwners(key));
if (ctx.isOriginLocal()) {
int currentTopologyId = stateTransferManager.getCacheTopology().getTopologyId();
boolean topologyChanged = currentTopologyId != commandTopologyId && commandTopologyId != -1;
if (trace) {
log.tracef("Command topology id is %d, current topology id is %d", commandTopologyId, currentTopologyId);
}
if (topologyChanged) {
throw new OutdatedTopologyException("Cache topology changed while the command was executing: expected " +
commandTopologyId + ", got " + currentTopologyId);
}

// At this point, we know that an entry located on this node that exists in the data container/store
// must also exist in the context.
ConsistentHash ch = command.getConsistentHash();
Set<Object> requestedKeys = new HashSet<>();
for (Object key : command.getKeys()) {
CacheEntry entry = ctx.lookupEntry(key);
if (entry == null || entry.isNull()) {
if (!isValueAvailableLocally(ch, key)) {
requestedKeys.add(key);
} else {
if (trace) {
log.tracef("Not doing a remote get for missing key %s since entry is "
+ "mapped to current node (%s). Owners are %s",
toStr(key), rpcManager.getAddress(), ch.locateOwners(key));
}
// forWrite=true forces a non-null entry to be created, because we know this entry is local
wrapInternalCacheEntry(null, ctx, key, true, command);
}
// forWrite=true forces a non-null entry to be created, because we know this entry is local
wrapInternalCacheEntry(null, ctx, key, true, command);
}
}
}

boolean missingRemoteValues = false;
if (!requestedKeys.isEmpty()) {
if (trace) {
log.tracef("Fetching entries for keys %s from remote nodes", requestedKeys);

boolean missingRemoteValues = false;
if (!requestedKeys.isEmpty()) {
if (trace) {
log.tracef("Fetching entries for keys %s from remote nodes", requestedKeys);
}

Map<Object, InternalCacheEntry> justRetrieved = retrieveFromRemoteSources(
requestedKeys, ctx, command.getFlags());
for (Object key : requestedKeys) {
if (!justRetrieved.containsKey(key)) {
missingRemoteValues = true;
} else {
wrapInternalCacheEntry(justRetrieved.get(key), ctx, key, true, command);
}
}
}

Map<Object, InternalCacheEntry> justRetrieved = retrieveFromRemoteSources(
requestedKeys, ctx, command.getFlags());
for (Object key : requestedKeys) {
if (!justRetrieved.containsKey(key)) {
missingRemoteValues = true;
} else {
wrapInternalCacheEntry(justRetrieved.get(key), ctx, key, true, command);

if (missingRemoteValues) {
throw new OutdatedTopologyException("Remote values are missing because of a topology change");
}
return invokeNextInterceptor(ctx, command);
} else {
Map<Object, Object> values = (Map<Object, Object>) invokeNextInterceptor(ctx, command);
int currentTopologyId = stateTransferManager.getCacheTopology().getTopologyId();
boolean topologyChanged = currentTopologyId != commandTopologyId && commandTopologyId != -1;
// If the topology changed while invoking, this means we could have a null value
// but there really wasn't so we have to suspect that entry.
if (topologyChanged) {
if (trace) {
log.tracef("Command topology id is %d, after topology id is %d",
commandTopologyId, currentTopologyId);
}
Iterator<Entry<Object, Object>> valueIterator = values.entrySet().iterator();
while (valueIterator.hasNext()) {
Entry<Object, Object> entry = valueIterator.next();
if (entry.getValue() == null) {
valueIterator.remove();
}
}
}
return values;
}

if (missingRemoteValues) {
throw new OutdatedTopologyException("Remote values are missing because of a topology change");
}

return invokeNextInterceptor(ctx, command);
}

@Override
Expand Down
Expand Up @@ -38,8 +38,12 @@
import org.infinispan.util.logging.LogFactory;

import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeoutException;

import static org.infinispan.commons.util.Util.toStr;
Expand Down Expand Up @@ -162,75 +166,111 @@ private Object visitGetCommand(InvocationContext ctx, AbstractDataCommand comman
@Override
public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) throws Throwable {
Map<Object, Object> map = (Map<Object, Object>) invokeNextInterceptor(ctx, command);
if (map == null) map = command.createMap();
if (!ctx.isOriginLocal() || command.hasFlag(Flag.CACHE_MODE_LOCAL)
int commandTopologyId = command.getTopologyId();
int currentTopologyId = stateTransferManager.getCacheTopology().getTopologyId();
boolean topologyChanged = currentTopologyId != commandTopologyId && commandTopologyId != -1;
// If the topology changed while invoking, this means we could have a null value
// but there really wasn't so we have to suspect that entry.
if (topologyChanged) {
if (trace) {
log.tracef("Command topology id is %d, after topology id is %d",
commandTopologyId, currentTopologyId);
}
Iterator<Entry<Object, Object>> valueIterator = map.entrySet().iterator();
while (valueIterator.hasNext()) {
Entry<Object, Object> entry = valueIterator.next();
if (entry.getValue() == null) {
valueIterator.remove();
}
}
}
if (command.hasFlag(Flag.CACHE_MODE_LOCAL)
|| command.hasFlag(Flag.SKIP_REMOTE_LOOKUP)
|| command.hasFlag(Flag.IGNORE_RETURN_VALUES)) {
|| command.hasFlag(Flag.IGNORE_RETURN_VALUES)
|| !ctx.isOriginLocal()) {
return map;
}

boolean missingRemoteValues = false;
Set<Object> requestedKeys = new HashSet<>(command.getKeys().size());
for (Object key : command.getKeys()) {
//if the cache entry has the value lock flag set, skip the remote get.
CacheEntry entry = ctx.lookupEntry(key);
boolean skipRemoteGet = entry != null && entry.skipLookup();

// need to check in the context as well since a null retval is not necessarily an indication of the entry not being
// available. It could just have been removed in the same tx beforehand. Also don't bother with a remote get if
// the entry is mapped to the local node.
if (!skipRemoteGet && map.get(key) == null) {
// TODO: what about the deltaCompositeKey? It's kind of messy, where should we use the composite key
// and where the regular one?
//key = filterDeltaCompositeKey(key);
boolean shouldFetchFromRemote = false;
if (entry == null || entry.isNull()) {
ConsistentHash ch = stateTransferManager.getCacheTopology().getReadConsistentHash();
shouldFetchFromRemote = !isValueAvailableLocally(ch, key);
if (!shouldFetchFromRemote && getLog().isTraceEnabled()) {
getLog().tracef("Not doing a remote get for key %s since entry is mapped to current node (%s) or is in L1. Owners are %s", toStr(key), rpcManager.getAddress(), ch.locateOwners(key));
// If the local map already found the key that means we don't need to go
// remote for it
if (!map.containsKey(key)) {
//if the cache entry has the value lock flag set, skip the remote get.
CacheEntry entry = ctx.lookupEntry(key);
boolean skipRemoteGet = entry != null && entry.skipLookup();

// need to check in the context as well since a null retval is not necessarily an indication of the entry not being
// available. It could just have been removed in the same tx beforehand. Also don't bother with a remote get if
// the entry is mapped to the local node.
if (!skipRemoteGet) {
// TODO: what about the deltaCompositeKey? It's kind of messy, where should we use the composite key
// and where the regular one?
//key = filterDeltaCompositeKey(key);
boolean shouldFetchFromRemote = false;
if (entry == null || entry.isNull()) {
ConsistentHash ch = stateTransferManager.getCacheTopology().getReadConsistentHash();
shouldFetchFromRemote = !isValueAvailableLocally(ch, key);
if (!shouldFetchFromRemote && trace) {
getLog().tracef("Not performing remote lookup of key %s as we own it"
+ " now, we didn't when we looked - will have to retry command after", key);
}
}
}
if (shouldFetchFromRemote) {
requestedKeys.add(key);
} else if (!ctx.isEntryRemovedInContext(key)) {
// Try again locally in case if we now are an owner for a key
Object localValue = localGet(ctx, key, false, command, command.isReturnEntries());
if (localValue != null) {
map.put(key, localValue);
if (shouldFetchFromRemote) {
requestedKeys.add(key);
} else if (!ctx.isEntryRemovedInContext(key)) {
// Try again locally in case if we now are an owner for a key
Object localValue = localGet(ctx, key, false, command, command.isReturnEntries());
if (localValue != null) {
map.put(key, localValue);
} else {
// Just in case rerun it again
missingRemoteValues = true;
}
}
}
}
}

if (!requestedKeys.isEmpty()) {
if (trace) {
log.tracef("Fetching entries for keys %s from remote nodes", requestedKeys);
}
Map<Object, InternalCacheEntry> previouslyRetrieved = command.getRemotelyFetched();

Map<Object, InternalCacheEntry> justRetrieved = retrieveFromRemoteSources(
requestedKeys, ctx, command.getFlags());
Map<Object, InternalCacheEntry> previouslyRetrieved = command.getRemotelyFetched();
if (previouslyRetrieved != null) {
previouslyRetrieved.putAll(justRetrieved);
} else {
command.setRemotelyFetched(justRetrieved);
}
for (Entry<Object, InternalCacheEntry> entry : justRetrieved.entrySet()) {
Object key = entry.getKey();
InternalCacheEntry value = entry.getValue();
map.put(entry.getKey(), command.isReturnEntries() ? value : value != null ? value.getValue() : null);
if (useClusteredWriteSkewCheck && ctx.isInTxScope()) {
((TxInvocationContext)ctx).getCacheTransaction().putLookedUpRemoteVersion(
key, value.getMetadata().version());
}

if (!ctx.replaceValue(key, value)) {
ctx.putLookedUpEntry(key, value);
if (ctx.isInTxScope()) {
((TxInvocationContext) ctx).getCacheTransaction().replaceVersionRead(
for (Object key : requestedKeys) {
if (!justRetrieved.containsKey(key)) {
missingRemoteValues = true;
} else {
InternalCacheEntry value = justRetrieved.get(key);
if (useClusteredWriteSkewCheck && ctx.isInTxScope()) {
((TxInvocationContext)ctx).getCacheTransaction().putLookedUpRemoteVersion(
key, value.getMetadata().version());
}

if (!ctx.replaceValue(key, value)) {
ctx.putLookedUpEntry(key, value);
if (ctx.isInTxScope()) {
((TxInvocationContext) ctx).getCacheTransaction().replaceVersionRead(
key, value.getMetadata().version());
}
}
map.put(key, command.isReturnEntries() ? value : value != null ? value.getValue() : null);
}
}
}

if (missingRemoteValues) {
throw new OutdatedTopologyException("Remote values are missing because of a topology change");
}
return map;
}

Expand Down
Expand Up @@ -2,6 +2,7 @@

import org.infinispan.InvalidCacheUsageException;
import org.infinispan.commands.DataCommand;
import org.infinispan.commands.read.GetAllCommand;
import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.context.InvocationContext;
Expand Down Expand Up @@ -40,7 +41,7 @@ protected Object visitDataWriteCommand(InvocationContext ctx, DataWriteCommand c
}

@Override
public Object visitGetManyCommand(InvocationContext ctx, GetManyCommand command) throws Throwable {
public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) throws Throwable {
assertNonTransactional(ctx);
try {
return invokeNextInterceptor(ctx, command);
Expand Down
Expand Up @@ -4,9 +4,6 @@
import org.infinispan.commands.DataCommand;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.read.AbstractDataCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.read.GetAllCommand;
import org.infinispan.commands.remote.recovery.TxCompletionNotificationCommand;
import org.infinispan.commands.tx.PrepareCommand;
Expand All @@ -24,8 +21,8 @@
import org.infinispan.util.logging.LogFactory;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;

/**
* Locking interceptor to be used by pessimistic caches.
Expand Down
@@ -1,5 +1,10 @@
package org.infinispan.partitionhandling.impl;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

import org.infinispan.commands.LocalFlagAffectedCommand;
import org.infinispan.commands.read.AbstractDataCommand;
import org.infinispan.commands.read.EntryRetrievalCommand;
Expand Down
Expand Up @@ -554,7 +554,7 @@ private static RspList<Object> processCalls(Map<Address, ReplicableCommand> comm
}

// a get() on each future will block till that call completes.
TimeService timeService = card.gcr.getTimeService();
TimeService timeService = card.timeService;
long waitTime = timeService.expectedEndTime(timeout, MILLISECONDS);
for (Map.Entry<Address, Future<Object>> entry : futures.entrySet()) {
Address target = entry.getKey();
Expand Down
Expand Up @@ -62,6 +62,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down

0 comments on commit 0a43e8f

Please sign in to comment.