Skip to content

Commit

Permalink
ISPN-6938 Refactor remoteGet in distribution interceptors
Browse files Browse the repository at this point in the history
  • Loading branch information
rvansa authored and danberindei committed Dec 5, 2016
1 parent 8963406 commit f81dfd3
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 119 deletions.
Expand Up @@ -23,9 +23,12 @@
import org.infinispan.commands.functional.ReadOnlyManyCommand;
import org.infinispan.commands.read.AbstractDataCommand;
import org.infinispan.commands.read.GetAllCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.remote.ClusteredGetAllCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.remote.GetKeysInGroupCommand;
import org.infinispan.commands.write.AbstractDataWriteCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.commands.write.ValueMatcher;
Expand All @@ -45,6 +48,7 @@
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.BasicInvocationStage;
import org.infinispan.interceptors.InvocationStage;
import org.infinispan.interceptors.impl.ClusteringInterceptor;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.remoting.RpcException;
Expand Down Expand Up @@ -314,14 +318,14 @@ private void updateWithValues(List<?> keys, Response r, Map<Object, InternalCach
}
}

protected final BasicInvocationStage handleNonTxWriteCommand(InvocationContext ctx, DataWriteCommand command)
protected final BasicInvocationStage handleNonTxWriteCommand(InvocationContext ctx, AbstractDataWriteCommand command)
throws Throwable {
if (ctx.isInTxScope()) {
throw new CacheException("Attempted execution of non-transactional write command in a transactional invocation context");
}

// see if we need to load values from remote sources first
CompletableFuture<?> remoteGetFuture = remoteGetBeforeWrite(ctx, command, command.getKey());
CompletableFuture<?> remoteGetFuture = remoteGetBeforeWrite(ctx, command, command.getKey(), true);
if (remoteGetFuture != null) {
return invokeNextAsync(ctx, command, remoteGetFuture).thenCompose(this::handleLocalResult);
} else {
Expand Down Expand Up @@ -730,7 +734,6 @@ public void countDown() {
}
}


/**
* @return Whether a remote get is needed to obtain the previous values of the affected entries.
*/
Expand All @@ -740,8 +743,52 @@ protected boolean valueIsMissing(CacheEntry entry) {
return entry == null || (entry.isNull() && !entry.isRemoved() && !entry.skipLookup());
}

protected abstract CompletableFuture<?> remoteGetBeforeWrite(InvocationContext ctx, WriteCommand command, Object key)
throws Throwable;
private InvocationStage visitGetCommand(InvocationContext ctx, AbstractDataCommand command) throws Throwable {
if (!ctx.isOriginLocal())
return invokeNext(ctx, command);

Object key = command.getKey();
CacheEntry entry = ctx.lookupEntry(key);
if (valueIsMissing(entry)) {
if (readNeedsRemoteValue(ctx, command)) {
if (trace)
log.tracef("Doing a remote get for key %s", key);
CompletableFuture<?> getFuture = remoteGet(ctx, command, command.getKey(), false);
return invokeNextAsync(ctx, command, getFuture);
}
}
return invokeNext(ctx, command);
}

@Override
public BasicInvocationStage visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command)
throws Throwable {
return visitGetCommand(ctx, command);
}

@Override
public BasicInvocationStage visitGetCacheEntryCommand(InvocationContext ctx,
GetCacheEntryCommand command) throws Throwable {
return visitGetCommand(ctx, command);
}

protected abstract CompletableFuture<?> remoteGet(InvocationContext ctx, AbstractDataCommand command,
Object key, boolean isWrite) throws Throwable;

protected CompletableFuture<?> remoteGetBeforeWrite(InvocationContext ctx, AbstractDataWriteCommand command,
Object key, boolean isWrite) throws Throwable {
CacheEntry entry = ctx.lookupEntry(key);
if (valueIsMissing(entry) && writeNeedsRemoteValue(ctx, command, key)) {
int currentTopologyId = stateTransferManager.getCacheTopology().getTopologyId();
int cmdTopology = command.getTopologyId();
if (currentTopologyId != cmdTopology && cmdTopology != -1) {
throw new OutdatedTopologyException("Cache topology changed while the command was executing: expected " +
cmdTopology + ", got " + currentTopologyId);
}
return remoteGet(ctx, command, key, isWrite);
}
return null;
}

/**
* @return {@code true} if the value is not available on the local node and a read command is allowed to
Expand Down
Expand Up @@ -25,8 +25,6 @@
import org.infinispan.commands.functional.WriteOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand;
import org.infinispan.commands.read.AbstractDataCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
Expand All @@ -35,7 +33,6 @@
import org.infinispan.commons.CacheException;
import org.infinispan.container.EntryFactory;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.NullCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
Expand Down Expand Up @@ -76,42 +73,6 @@ public class NonTxDistributionInterceptor extends BaseDistributionInterceptor {
private static Log log = LogFactory.getLog(NonTxDistributionInterceptor.class);
private static final boolean trace = log.isTraceEnabled();

@Override
public BasicInvocationStage visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws
Throwable {
return visitGetCommand(ctx, command);
}

@Override
public BasicInvocationStage visitGetCacheEntryCommand(InvocationContext ctx, GetCacheEntryCommand command)
throws Throwable {
return visitGetCommand(ctx, command);
}

private <T extends AbstractDataCommand> BasicInvocationStage visitGetCommand(
InvocationContext ctx, T command) throws Throwable {
if (!ctx.isOriginLocal())
return invokeNext(ctx, command);

Object key = command.getKey();
CacheEntry entry = ctx.lookupEntry(key);
if (valueIsMissing(entry)) {
if (readNeedsRemoteValue(ctx, command)) {
if (trace)
log.tracef("Doing a remote get for key %s", key);
CompletableFuture<InternalCacheEntry> remoteFuture = retrieveFromProperSource(key, ctx, command, false);
return invokeNextAsync(ctx, command, remoteFuture.thenAccept(remoteEntry -> handleRemoteEntry(ctx, key, remoteEntry)));
}
}
return invokeNext(ctx, command);
}

private void handleRemoteEntry(InvocationContext ctx, Object key, InternalCacheEntry remoteEntry) {
if (remoteEntry != null) {
entryFactory.wrapExternalEntry(ctx, key, remoteEntry, EntryFactory.Wrap.STORE, false);
}
}

@Override
public BasicInvocationStage visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws
Throwable {
Expand Down Expand Up @@ -737,27 +698,12 @@ public BasicInvocationStage visitWriteOnlyKeyCommand(InvocationContext ctx, Writ
}

@Override
protected CompletableFuture<?> remoteGetBeforeWrite(InvocationContext ctx, WriteCommand command, Object key)
throws Throwable {
CacheEntry entry = ctx.lookupEntry(key);
if (!valueIsMissing(entry)) {
return null;
}
CompletableFuture<InternalCacheEntry> remoteFuture;
if (writeNeedsRemoteValue(ctx, command, key)) {
int currentTopologyId = stateTransferManager.getCacheTopology().getTopologyId();
int cmdTopology = command.getTopologyId();
boolean topologyChanged = currentTopologyId != cmdTopology && cmdTopology != -1;
if (topologyChanged) {
throw new OutdatedTopologyException("Cache topology changed while the command was executing: expected " +
cmdTopology + ", got " + currentTopologyId);
protected CompletableFuture<?> remoteGet(InvocationContext ctx, AbstractDataCommand command, Object key, boolean isWrite) throws Exception {
return retrieveFromProperSource(key, ctx, command, isWrite).thenAccept(remoteEntry -> {
if (remoteEntry != null) {
entryFactory.wrapExternalEntry(ctx, key, remoteEntry, EntryFactory.Wrap.STORE, false);
}
remoteFuture = retrieveFromProperSource(key, ctx, command, false);
return remoteFuture.thenAccept(remoteEntry -> {
handleRemoteEntry(ctx, key, remoteEntry);
});
}
return null;
});
}

@Override
Expand Down
Expand Up @@ -10,16 +10,14 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.read.AbstractDataCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.tx.TransactionBoundaryCommand;
import org.infinispan.commands.tx.VersionedCommitCommand;
import org.infinispan.commands.write.AbstractDataWriteCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
Expand All @@ -28,8 +26,6 @@
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.configuration.cache.Configurations;
import org.infinispan.container.EntryFactory;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.versioning.EntryVersionsMap;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
Expand Down Expand Up @@ -110,31 +106,6 @@ public BasicInvocationStage visitPutMapCommand(InvocationContext ctx, PutMapComm
return invokeNext(ctx, command);
}

@Override
public BasicInvocationStage visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
return visitGetCommand(ctx, command);
}

@Override
public BasicInvocationStage visitGetCacheEntryCommand(InvocationContext ctx, GetCacheEntryCommand command)
throws Throwable {
return visitGetCommand(ctx, command);
}

private InvocationStage visitGetCommand(InvocationContext ctx, AbstractDataCommand command)
throws Throwable {
Object key = command.getKey();
CacheEntry entry = ctx.lookupEntry(key);
// If the cache entry has the value lock flag set, skip the remote get.
if (ctx.isOriginLocal() && valueIsMissing(entry)) {
if (readNeedsRemoteValue(ctx, command)) {
remoteGet(ctx, key, false, command);
}
}

return invokeNext(ctx, command);
}

@Override
public BasicInvocationStage visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command)
throws Throwable {
Expand Down Expand Up @@ -294,11 +265,11 @@ private boolean checkCacheNotFoundResponseInPartitionHandling(TransactionBoundar
* If we are within one transaction we won't do any replication as replication would only be performed at commit
* time. If the operation didn't originate locally we won't do any replication either.
*/
private InvocationStage handleTxWriteCommand(InvocationContext ctx, WriteCommand command, Object key)
private InvocationStage handleTxWriteCommand(InvocationContext ctx, AbstractDataWriteCommand command, Object key)
throws Throwable {
// see if we need to load values from remote sources first
try {
CompletableFuture<?> remoteGetFuture = remoteGetBeforeWrite(ctx, command, key);
CompletableFuture<?> remoteGetFuture = remoteGetBeforeWrite(ctx, command, key, true);
InvocationStage stage;
if (remoteGetFuture == null) {
stage = invokeNext(ctx, command);
Expand Down Expand Up @@ -337,30 +308,14 @@ protected boolean writeNeedsRemoteValue(InvocationContext ctx, WriteCommand comm
}

@Override
protected CompletableFuture<?> remoteGetBeforeWrite(InvocationContext ctx, WriteCommand command, Object key)
throws Throwable {
CacheEntry entry = ctx.lookupEntry(key);
if (!valueIsMissing(entry)) {
// The entry already exists in the context, and it shouldn't be re-fetched
return null;
}
if (writeNeedsRemoteValue(ctx, command, key)) {
remoteGet(ctx, key, true, command);
}
return null;
}

protected InternalCacheEntry remoteGet(InvocationContext ctx, Object key, boolean isWrite,
FlagAffectedCommand command) throws Throwable {
protected CompletableFuture<?> remoteGet(InvocationContext ctx, AbstractDataCommand command, Object key, boolean isWrite) throws Throwable {
// attempt a remote lookup
InternalCacheEntry ice = retrieveFromProperSource(key, ctx, command, isWrite).get();

if (ice != null) {
EntryFactory.Wrap wrap = isWrite ? EntryFactory.Wrap.WRAP_NON_NULL : EntryFactory.Wrap.STORE;
entryFactory.wrapExternalEntry(ctx, key, ice, wrap, false);
return ice;
}
return null;
return retrieveFromProperSource(key, ctx, command, isWrite).thenAccept(ice -> {
if (ice != null) {
EntryFactory.Wrap wrap = isWrite ? EntryFactory.Wrap.WRAP_NON_NULL : EntryFactory.Wrap.STORE;
entryFactory.wrapExternalEntry(ctx, key, ice, wrap, false);
}
});
}

private RpcOptions createCommitRpcOptions() {
Expand Down
Expand Up @@ -757,6 +757,7 @@ protected boolean shouldCommitDuringPrepare(PrepareCommand command, TxInvocation
protected final void wrapEntriesForPrepare(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
if (!ctx.isOriginLocal() || command.isReplayEntryWrapping()) {
for (WriteCommand c : command.getModifications()) {
c.setTopologyId(command.getTopologyId());
InvocationStage visitorStage = (InvocationStage) c.acceptVisitor(ctx, entryWrappingVisitor);
if (visitorStage != null) {
// Wait for the sub-command to finish. If there was an exception, rethrow it.
Expand Down

0 comments on commit f81dfd3

Please sign in to comment.