Skip to content

Commit

Permalink
ISPN-7509 TotalOrderStateTransferInterceptor doesn't handle OutdatedT…
Browse files Browse the repository at this point in the history
…opologyException for read commands
  • Loading branch information
pruivo authored and ryanemerson committed Apr 21, 2017
1 parent cb3301e commit 92abcc7
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 108 deletions.
Expand Up @@ -8,11 +8,16 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction; import java.util.function.BiFunction;


import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.TopologyAffectedCommand; import org.infinispan.commands.TopologyAffectedCommand;
import org.infinispan.commands.VisitableCommand; import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.read.GetAllCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.remote.GetKeysInGroupCommand; import org.infinispan.commands.remote.GetKeysInGroupCommand;
import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.Configuration;
import org.infinispan.context.InvocationContext; import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.DistributionManager; import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.KnownComponentNames; import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName; import org.infinispan.factories.annotations.ComponentName;
Expand Down Expand Up @@ -41,10 +46,11 @@
*/ */
public abstract class BaseStateTransferInterceptor extends DDAsyncInterceptor { public abstract class BaseStateTransferInterceptor extends DDAsyncInterceptor {
private final boolean trace = getLog().isTraceEnabled(); private final boolean trace = getLog().isTraceEnabled();
private final InvocationFinallyFunction handleReadCommandReturn = this::handleReadCommandReturn;


protected StateTransferManager stateTransferManager; private StateTransferManager stateTransferManager;
protected StateTransferLock stateTransferLock; protected StateTransferLock stateTransferLock;
protected Executor remoteExecutor; private Executor remoteExecutor;
private DistributionManager distributionManager; private DistributionManager distributionManager;
private ScheduledExecutorService timeoutExecutor; private ScheduledExecutorService timeoutExecutor;


Expand Down Expand Up @@ -128,7 +134,7 @@ protected final int currentTopologyId() {
return cacheTopology == null ? -1 : cacheTopology.getTopologyId(); return cacheTopology == null ? -1 : cacheTopology.getTopologyId();
} }


protected final void updateTopologyId(TopologyAffectedCommand command) throws InterruptedException { protected final void updateTopologyId(TopologyAffectedCommand command) {
// set the topology id if it was not set before (ie. this is local command) // set the topology id if it was not set before (ie. this is local command)
// TODO Make tx commands extend FlagAffectedCommand so we can use CACHE_MODE_LOCAL in TransactionTable.cleanupStaleTransactions // TODO Make tx commands extend FlagAffectedCommand so we can use CACHE_MODE_LOCAL in TransactionTable.cleanupStaleTransactions
if (command.getTopologyId() == -1) { if (command.getTopologyId() == -1) {
Expand All @@ -141,7 +147,7 @@ protected final void updateTopologyId(TopologyAffectedCommand command) throws In


protected <T extends VisitableCommand> Object retryWhenDone(CompletableFuture<Void> future, int topologyId, protected <T extends VisitableCommand> Object retryWhenDone(CompletableFuture<Void> future, int topologyId,
InvocationContext ctx, T command, InvocationContext ctx, T command,
InvocationFinallyFunction callback) throws Throwable { InvocationFinallyFunction callback) {
if (future.isDone()) { if (future.isDone()) {
getLog().tracef("Retrying command %s for topology %d", command, topologyId); getLog().tracef("Retrying command %s for topology %d", command, topologyId);
return invokeNextAndHandle(ctx, command, callback); return invokeNextAndHandle(ctx, command, callback);
Expand All @@ -159,6 +165,88 @@ protected <T extends VisitableCommand> Object retryWhenDone(CompletableFuture<Vo
} }
} }


@Override
public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
return handleReadCommand(ctx, command);
}

@Override
public Object visitGetCacheEntryCommand(InvocationContext ctx, GetCacheEntryCommand command)
throws Throwable {
return handleReadCommand(ctx, command);
}

@Override
public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) throws Throwable {
return handleReadCommand(ctx, command);
}

protected <C extends VisitableCommand & TopologyAffectedCommand & FlagAffectedCommand> Object handleReadCommand(
InvocationContext ctx, C command) {
return isLocalOnly(command) ? invokeNext(ctx, command) :
updateAndInvokeNextRead(ctx, command);
}

private <C extends VisitableCommand & TopologyAffectedCommand> Object updateAndInvokeNextRead(InvocationContext ctx, C command) {
updateTopologyId(command);
return invokeNextAndHandle(ctx, command,handleReadCommandReturn);
}

private Object handleReadCommandReturn(InvocationContext rCtx, VisitableCommand rCommand, Object rv, Throwable t)
throws Throwable {
if (t == null)
return rv;

Throwable ce = t;
while (ce instanceof RemoteException) {
ce = ce.getCause();
}
final CacheTopology cacheTopology = stateTransferManager.getCacheTopology();
int currentTopologyId = cacheTopology == null ? -1 : cacheTopology.getTopologyId();
TopologyAffectedCommand cmd = (TopologyAffectedCommand) rCommand;
if (ce instanceof SuspectException) {
if (trace)
getLog().tracef("Retrying command because of suspected node, current topology is %d: %s",
currentTopologyId, rCommand);
// It is possible that current topology is actual but the view still contains a node that's about to leave;
// a broadcast to all nodes then can end with suspect exception, but we won't get any new topology.
// An example of this situation is when a node sends leave - topology can be installed before the new view.
// To prevent suspect exceptions use SYNCHRONOUS_IGNORE_LEAVERS response mode.
if (cacheTopology != null && currentTopologyId == cmd.getTopologyId() && !cacheTopology.getActualMembers().contains(((SuspectException) ce).getSuspect())) {
// TODO: provide a test case
throw new IllegalStateException("Command was not sent with SYNCHRONOUS_IGNORE_LEAVERS?");
}
} else if (ce instanceof OutdatedTopologyException) {
if (trace)
getLog().tracef("Retrying command because of topology change, current topology is %d: %s",
currentTopologyId, cmd);
} else {
throw t;
}
// We increment the topology to wait for the next topology.
// Without this, we could retry the command too fast and we could get the OutdatedTopologyException again.
int newTopologyId = getNewTopologyId(ce, currentTopologyId, cmd);
cmd.setTopologyId(newTopologyId);
((FlagAffectedCommand)rCommand).addFlags(FlagBitSets.COMMAND_RETRY);
CompletableFuture<Void> topologyFuture = stateTransferLock.topologyFuture(newTopologyId);
return retryWhenDone(topologyFuture, newTopologyId, rCtx, rCommand, handleReadCommandReturn);
}

protected int getNewTopologyId(Throwable ce, int currentTopologyId, TopologyAffectedCommand command) {
int requestedTopologyId = command.getTopologyId() + 1;
if (ce instanceof OutdatedTopologyException) {
OutdatedTopologyException ote = (OutdatedTopologyException) ce;
if (ote.requestedTopologyId >= 0) {
requestedTopologyId = ote.requestedTopologyId;
}
}
return Math.max(currentTopologyId, requestedTopologyId);
}

protected boolean isLocalOnly(FlagAffectedCommand command) {
return command.hasAnyFlag(FlagBitSets.CACHE_MODE_LOCAL);
}

protected abstract Log getLog(); protected abstract Log getLog();


private static class CancellableRetry<T extends VisitableCommand> implements BiFunction<Void, Throwable, Void>, Runnable { private static class CancellableRetry<T extends VisitableCommand> implements BiFunction<Void, Throwable, Void>, Runnable {
Expand All @@ -180,7 +268,7 @@ private static class CancellableRetry<T extends VisitableCommand> implements BiF
@SuppressWarnings("unused") @SuppressWarnings("unused")
private volatile Object timeoutFuture; private volatile Object timeoutFuture;


public CancellableRetry(T command, int topologyId) { CancellableRetry(T command, int topologyId) {
this.command = command; this.command = command;
this.topologyId = topologyId; this.topologyId = topologyId;
} }
Expand Down
Expand Up @@ -65,6 +65,7 @@ private Object localPrepare(TxInvocationContext ctx, PrepareCommand command) thr


private Object handleLocalPrepareReturn(InvocationContext ctx, VisitableCommand command, Throwable t) private Object handleLocalPrepareReturn(InvocationContext ctx, VisitableCommand command, Throwable t)
throws Throwable { throws Throwable {
assert t != null;
// If we receive a RetryPrepareException it was because the prepare was delivered during a state transfer. // If we receive a RetryPrepareException it was because the prepare was delivered during a state transfer.
// Remember that the REBALANCE_START and CH_UPDATE are totally ordered with the prepares and the // Remember that the REBALANCE_START and CH_UPDATE are totally ordered with the prepares and the
// prepares are unblocked after the rebalance has finished. // prepares are unblocked after the rebalance has finished.
Expand Down
Expand Up @@ -10,9 +10,6 @@
import org.infinispan.commands.functional.ReadOnlyManyCommand; import org.infinispan.commands.functional.ReadOnlyManyCommand;
import org.infinispan.commands.functional.ReadWriteKeyCommand; import org.infinispan.commands.functional.ReadWriteKeyCommand;
import org.infinispan.commands.functional.ReadWriteKeyValueCommand; import org.infinispan.commands.functional.ReadWriteKeyValueCommand;
import org.infinispan.commands.read.GetAllCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.tx.CommitCommand; import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand; import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand; import org.infinispan.commands.tx.RollbackCommand;
Expand All @@ -30,14 +27,12 @@
import org.infinispan.context.InvocationContext; import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets; import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.context.impl.TxInvocationContext; import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.InvocationFinallyFunction; import org.infinispan.interceptors.InvocationFinallyFunction;
import org.infinispan.interceptors.impl.BaseStateTransferInterceptor; import org.infinispan.interceptors.impl.BaseStateTransferInterceptor;
import org.infinispan.remoting.RemoteException; import org.infinispan.remoting.RemoteException;
import org.infinispan.remoting.responses.UnsureResponse; import org.infinispan.remoting.responses.UnsureResponse;
import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.jgroups.SuspectException; import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.logging.Log; import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory; import org.infinispan.util.logging.LogFactory;


Expand All @@ -61,18 +56,10 @@ public class StateTransferInterceptor extends BaseStateTransferInterceptor {
private static final Log log = LogFactory.getLog(StateTransferInterceptor.class); private static final Log log = LogFactory.getLog(StateTransferInterceptor.class);
private static final boolean trace = log.isTraceEnabled(); private static final boolean trace = log.isTraceEnabled();


private StateTransferManager stateTransferManager;

private final InvocationFinallyFunction handleReadCommandReturn = this::handleReadCommandReturn;
private final InvocationFinallyFunction handleTxReturn = this::handleTxReturn; private final InvocationFinallyFunction handleTxReturn = this::handleTxReturn;
private final InvocationFinallyFunction handleTxWriteReturn = this::handleTxWriteReturn; private final InvocationFinallyFunction handleTxWriteReturn = this::handleTxWriteReturn;
private final InvocationFinallyFunction handleNonTxWriteReturn = this::handleNonTxWriteReturn; private final InvocationFinallyFunction handleNonTxWriteReturn = this::handleNonTxWriteReturn;


@Inject
public void init(StateTransferManager stateTransferManager) {
this.stateTransferManager = stateTransferManager;
}

@Override @Override
public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command)
throws Throwable { throws Throwable {
Expand Down Expand Up @@ -160,73 +147,6 @@ public Object visitEvictCommand(InvocationContext ctx, EvictCommand command)
return invokeNext(ctx, command); return invokeNext(ctx, command);
} }


@Override
public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
return handleReadCommand(ctx, command);
}

@Override
public Object visitGetCacheEntryCommand(InvocationContext ctx, GetCacheEntryCommand command)
throws Throwable {
return handleReadCommand(ctx, command);
}

@Override
public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) throws Throwable {
return handleReadCommand(ctx, command);
}

private <C extends VisitableCommand & TopologyAffectedCommand & FlagAffectedCommand> Object handleReadCommand(InvocationContext ctx, C command) throws Throwable {
return isLocalOnly(command) ? invokeNext(ctx, command) :
updateAndInvokeNextRead(ctx, command);
}

private <C extends VisitableCommand & TopologyAffectedCommand> Object updateAndInvokeNextRead(InvocationContext ctx, C command)
throws InterruptedException {
updateTopologyId(command);
return invokeNextAndHandle(ctx, command,handleReadCommandReturn);
}

private Object handleReadCommandReturn(InvocationContext rCtx, VisitableCommand rCommand, Object rv, Throwable t)
throws Throwable {
if (t == null)
return rv;

Throwable ce = t;
while (ce instanceof RemoteException) {
ce = ce.getCause();
}
final CacheTopology cacheTopology = stateTransferManager.getCacheTopology();
int currentTopologyId = cacheTopology == null ? -1 : cacheTopology.getTopologyId();
TopologyAffectedCommand cmd = (TopologyAffectedCommand) rCommand;
if (ce instanceof SuspectException) {
if (trace)
log.tracef("Retrying command because of suspected node, current topology is %d: %s",
currentTopologyId, rCommand);
// It is possible that current topology is actual but the view still contains a node that's about to leave;
// a broadcast to all nodes then can end with suspect exception, but we won't get any new topology.
// An example of this situation is when a node sends leave - topology can be installed before the new view.
// To prevent suspect exceptions use SYNCHRONOUS_IGNORE_LEAVERS response mode.
if (currentTopologyId == cmd.getTopologyId() && !cacheTopology.getActualMembers().contains(((SuspectException) ce).getSuspect())) {
// TODO: provide a test case
throw new IllegalStateException("Command was not sent with SYNCHRONOUS_IGNORE_LEAVERS?");
}
} else if (ce instanceof OutdatedTopologyException) {
if (trace)
log.tracef("Retrying command because of topology change, current topology is %d: %s",
currentTopologyId, cmd);
} else {
throw t;
}
// We increment the topology to wait for the next topology.
// Without this, we could retry the command too fast and we could get the OutdatedTopologyException again.
int newTopologyId = getNewTopologyId(ce, currentTopologyId, cmd);
cmd.setTopologyId(newTopologyId);
((FlagAffectedCommand)rCommand).addFlags(FlagBitSets.COMMAND_RETRY);
CompletableFuture<Void> topologyFuture = stateTransferLock.topologyFuture(newTopologyId);
return retryWhenDone(topologyFuture, newTopologyId, rCtx, rCommand, handleReadCommandReturn);
}

@Override @Override
public Object visitReadWriteKeyValueCommand(InvocationContext ctx, public Object visitReadWriteKeyValueCommand(InvocationContext ctx,
ReadWriteKeyValueCommand command) throws Throwable { ReadWriteKeyValueCommand command) throws Throwable {
Expand All @@ -253,7 +173,7 @@ public Object visitReadOnlyManyCommand(InvocationContext ctx, ReadOnlyManyComman
* Special processing required for transaction commands. * Special processing required for transaction commands.
* *
*/ */
private Object handleTxCommand(TxInvocationContext ctx, TransactionBoundaryCommand command) throws Throwable { private Object handleTxCommand(TxInvocationContext ctx, TransactionBoundaryCommand command) {
if (trace) log.tracef("handleTxCommand for command %s, origin %s", command, getOrigin(ctx)); if (trace) log.tracef("handleTxCommand for command %s, origin %s", command, getOrigin(ctx));
updateTopologyId(command); updateTopologyId(command);


Expand Down Expand Up @@ -302,17 +222,15 @@ private Object handleTxReturn(InvocationContext ctx,
return rv; return rv;
} }


protected Object handleWriteCommand(InvocationContext ctx, WriteCommand command) private Object handleWriteCommand(InvocationContext ctx, WriteCommand command) {
throws Throwable {
if (ctx.isInTxScope()) { if (ctx.isInTxScope()) {
return handleTxWriteCommand(ctx, command); return handleTxWriteCommand(ctx, command);
} else { } else {
return handleNonTxWriteCommand(ctx, command); return handleNonTxWriteCommand(ctx, command);
} }
} }


private Object handleTxWriteCommand(InvocationContext ctx, WriteCommand command) private Object handleTxWriteCommand(InvocationContext ctx, WriteCommand command) {
throws Throwable {
if (trace) log.tracef("handleTxWriteCommand for command %s, origin %s", command, ctx.getOrigin()); if (trace) log.tracef("handleTxWriteCommand for command %s, origin %s", command, ctx.getOrigin());


if (isLocalOnly(command)) { if (isLocalOnly(command)) {
Expand Down Expand Up @@ -359,8 +277,7 @@ private Object handleTxWriteReturn(InvocationContext rCtx, VisitableCommand rCom
* But we only retry on the originator, and only if the command doesn't have * But we only retry on the originator, and only if the command doesn't have
* the {@code CACHE_MODE_LOCAL} flag. * the {@code CACHE_MODE_LOCAL} flag.
*/ */
private Object handleNonTxWriteCommand(InvocationContext ctx, WriteCommand command) private Object handleNonTxWriteCommand(InvocationContext ctx, WriteCommand command) {
throws Throwable {
if (trace) log.tracef("handleNonTxWriteCommand for command %s, topology id %d", command, command.getTopologyId()); if (trace) log.tracef("handleNonTxWriteCommand for command %s, topology id %d", command, command.getTopologyId());


if (isLocalOnly(command)) { if (isLocalOnly(command)) {
Expand Down Expand Up @@ -406,17 +323,6 @@ private Object handleNonTxWriteReturn(InvocationContext rCtx,
return retryWhenDone(transactionDataFuture, newTopologyId, rCtx, writeCommand, handleNonTxWriteReturn); return retryWhenDone(transactionDataFuture, newTopologyId, rCtx, writeCommand, handleNonTxWriteReturn);
} }


private int getNewTopologyId(Throwable ce, int currentTopologyId, TopologyAffectedCommand command) {
int requestedTopologyId = command.getTopologyId() + 1;
if (ce instanceof OutdatedTopologyException) {
OutdatedTopologyException ote = (OutdatedTopologyException) ce;
if (ote.requestedTopologyId >= 0) {
requestedTopologyId = ote.requestedTopologyId;
}
}
return Math.max(currentTopologyId, requestedTopologyId);
}

@Override @Override
public Object handleDefault(InvocationContext ctx, VisitableCommand command) public Object handleDefault(InvocationContext ctx, VisitableCommand command)
throws Throwable { throws Throwable {
Expand All @@ -428,7 +334,7 @@ public Object handleDefault(InvocationContext ctx, VisitableCommand command)
} }


private Object handleTopologyAffectedCommand(InvocationContext ctx, private Object handleTopologyAffectedCommand(InvocationContext ctx,
VisitableCommand command, Address origin) throws Throwable { VisitableCommand command, Address origin) {
if (trace) log.tracef("handleTopologyAffectedCommand for command %s, origin %s", command, origin); if (trace) log.tracef("handleTopologyAffectedCommand for command %s, origin %s", command, origin);


if (isLocalOnly((FlagAffectedCommand) command)) { if (isLocalOnly((FlagAffectedCommand) command)) {
Expand All @@ -439,10 +345,6 @@ private Object handleTopologyAffectedCommand(InvocationContext ctx,
return invokeNext(ctx, command); return invokeNext(ctx, command);
} }


private boolean isLocalOnly(FlagAffectedCommand command) {
return command.hasAnyFlag(FlagBitSets.CACHE_MODE_LOCAL);
}

@Override @Override
protected Log getLog() { protected Log getLog() {
return log; return log;
Expand Down

0 comments on commit 92abcc7

Please sign in to comment.