Skip to content

Commit

Permalink
ISPN-5623 Retried prepare commands do not wait for backup locks
Browse files Browse the repository at this point in the history
  • Loading branch information
pruivo authored and danberindei committed Dec 7, 2015
1 parent 5f105f0 commit 0d37774
Show file tree
Hide file tree
Showing 13 changed files with 286 additions and 81 deletions.
Expand Up @@ -50,6 +50,7 @@ public class PrepareCommand extends AbstractTransactionBoundaryCommand implement
protected CacheNotifier notifier; protected CacheNotifier notifier;
protected RecoveryManager recoveryManager; protected RecoveryManager recoveryManager;
private transient boolean replayEntryWrapping = false; private transient boolean replayEntryWrapping = false;
protected boolean retriedCommand;


private static final WriteCommand[] EMPTY_WRITE_COMMAND_ARRAY = new WriteCommand[0]; private static final WriteCommand[] EMPTY_WRITE_COMMAND_ARRAY = new WriteCommand[0];


Expand Down Expand Up @@ -189,10 +190,11 @@ public byte getCommandId() {
public Object[] getParameters() { public Object[] getParameters() {
int numMods = modifications == null ? 0 : modifications.length; int numMods = modifications == null ? 0 : modifications.length;
int i = 0; int i = 0;
final int params = 3; final int params = 4;
Object[] retval = new Object[numMods + params]; Object[] retval = new Object[numMods + params];
retval[i++] = globalTx; retval[i++] = globalTx;
retval[i++] = onePhaseCommit; retval[i++] = onePhaseCommit;
retval[i++] = retriedCommand;
retval[i] = numMods; retval[i] = numMods;
if (numMods > 0) System.arraycopy(modifications, 0, retval, params, numMods); if (numMods > 0) System.arraycopy(modifications, 0, retval, params, numMods);
return retval; return retval;
Expand All @@ -204,6 +206,7 @@ public void setParameters(int commandId, Object[] args) {
int i = 0; int i = 0;
globalTx = (GlobalTransaction) args[i++]; globalTx = (GlobalTransaction) args[i++];
onePhaseCommit = (Boolean) args[i++]; onePhaseCommit = (Boolean) args[i++];
retriedCommand = (boolean) args[i++];
int numMods = (Integer) args[i++]; int numMods = (Integer) args[i++];
if (numMods > 0) { if (numMods > 0) {
modifications = new WriteCommand[numMods]; modifications = new WriteCommand[numMods];
Expand All @@ -224,6 +227,7 @@ public String toString() {
return "PrepareCommand {" + return "PrepareCommand {" +
"modifications=" + (modifications == null ? null : Arrays.asList(modifications)) + "modifications=" + (modifications == null ? null : Arrays.asList(modifications)) +
", onePhaseCommit=" + onePhaseCommit + ", onePhaseCommit=" + onePhaseCommit +
", retried=" + retriedCommand +
", " + super.toString(); ", " + super.toString();
} }


Expand Down Expand Up @@ -260,4 +264,11 @@ public boolean isReturnValueExpected() {
return false; return false;
} }


public boolean isRetriedCommand() {
return retriedCommand;
}

public void setRetriedCommand(boolean retriedCommand) {
this.retriedCommand = retriedCommand;
}
} }
Expand Up @@ -48,10 +48,11 @@ public byte getCommandId() {
public Object[] getParameters() { public Object[] getParameters() {
int numMods = modifications == null ? 0 : modifications.length; int numMods = modifications == null ? 0 : modifications.length;
int i = 0; int i = 0;
final int params = 4; final int params = 5;
Object[] retval = new Object[numMods + params]; Object[] retval = new Object[numMods + params];
retval[i++] = globalTx; retval[i++] = globalTx;
retval[i++] = onePhaseCommit; retval[i++] = onePhaseCommit;
retval[i++] = retriedCommand;
retval[i++] = versionsSeen; retval[i++] = versionsSeen;
retval[i++] = numMods; retval[i++] = numMods;
if (numMods > 0) System.arraycopy(modifications, 0, retval, params, numMods); if (numMods > 0) System.arraycopy(modifications, 0, retval, params, numMods);
Expand All @@ -64,6 +65,7 @@ public void setParameters(int commandId, Object[] args) {
int i = 0; int i = 0;
globalTx = (GlobalTransaction) args[i++]; globalTx = (GlobalTransaction) args[i++];
onePhaseCommit = (Boolean) args[i++]; onePhaseCommit = (Boolean) args[i++];
retriedCommand = (Boolean) args[i++];
versionsSeen = (EntryVersionsMap) args[i++]; versionsSeen = (EntryVersionsMap) args[i++];
int numMods = (Integer) args[i++]; int numMods = (Integer) args[i++];
if (numMods > 0) { if (numMods > 0) {
Expand All @@ -82,6 +84,7 @@ public String toString() {
return "VersionedPrepareCommand {" + return "VersionedPrepareCommand {" +
"modifications=" + (modifications == null ? null : Arrays.asList(modifications)) + "modifications=" + (modifications == null ? null : Arrays.asList(modifications)) +
", onePhaseCommit=" + onePhaseCommit + ", onePhaseCommit=" + onePhaseCommit +
", retried=" + retriedCommand +
", versionsSeen=" + versionsSeen + ", versionsSeen=" + versionsSeen +
", gtx=" + globalTx + ", gtx=" + globalTx +
", cacheName='" + cacheName + '\'' + ", cacheName='" + cacheName + '\'' +
Expand Down
Expand Up @@ -61,6 +61,11 @@ public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand comman
final Collection<Object> keysToLock = command.getKeysToLock(); final Collection<Object> keysToLock = command.getKeysToLock();
((TxInvocationContext<?>) ctx).addAllAffectedKeys(command.getAffectedKeys()); ((TxInvocationContext<?>) ctx).addAllAffectedKeys(command.getAffectedKeys());
if (!keysToLock.isEmpty()) { if (!keysToLock.isEmpty()) {
if (command.isRetriedCommand() && ctx.isOriginLocal()) {
//clear backup locks for local and retried commands only. The remote commands clears the backup locks in PendingTxAction.
ctx.getCacheTransaction().cleanupBackupLocks();
keysToLock.removeAll(ctx.getLockedKeys()); //already locked!
}
Collection<Object> lockedKeys = lockAllOrRegisterBackupLock(ctx, keysToLock, Collection<Object> lockedKeys = lockAllOrRegisterBackupLock(ctx, keysToLock,
cacheConfiguration.locking().lockAcquisitionTimeout()); cacheConfiguration.locking().lockAcquisitionTimeout());
if (!lockedKeys.isEmpty()) { if (!lockedKeys.isEmpty()) {
Expand Down
@@ -1,5 +1,6 @@
package org.infinispan.remoting.inboundhandler.action; package org.infinispan.remoting.inboundhandler.action;


import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.interceptors.locking.ClusteringDependentLogic; import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.util.concurrent.locks.LockListener; import org.infinispan.util.concurrent.locks.LockListener;
import org.infinispan.util.concurrent.locks.LockManager; import org.infinispan.util.concurrent.locks.LockManager;
Expand Down Expand Up @@ -57,6 +58,11 @@ protected ActionStatus init(ActionState state) {
return cas(InternalState.CHECKING, InternalState.READY) ? ActionStatus.READY : ActionStatus.NOT_READY; return cas(InternalState.CHECKING, InternalState.READY) ? ActionStatus.READY : ActionStatus.NOT_READY;
} }


TxInvocationContext context = createContext(state);
if (context != null) {
keysToLock.forEach(context::addLockedKey);
}

LockPromise promise = keysToLock.size() == 1 ? LockPromise promise = keysToLock.size() == 1 ?
lockManager.lock(keysToLock.get(0), lockOwner, timeout, TimeUnit.MILLISECONDS) : lockManager.lock(keysToLock.get(0), lockOwner, timeout, TimeUnit.MILLISECONDS) :
lockManager.lockAll(keysToLock, lockOwner, timeout, TimeUnit.MILLISECONDS); lockManager.lockAll(keysToLock, lockOwner, timeout, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -91,4 +97,12 @@ public void cleanup(ActionState state) {
public void onEvent(LockState state) { public void onEvent(LockState state) {
notifier.complete(null); notifier.complete(null);
} }

private TxInvocationContext<?> createContext(ActionState state) {
RemoteLockCommand command = state.getCommand();
if (command instanceof TransactionalRemoteLockCommand) {
return ((TransactionalRemoteLockCommand) command).createContext();
}
return null;
}
} }
@@ -1,5 +1,6 @@
package org.infinispan.remoting.inboundhandler.action; package org.infinispan.remoting.inboundhandler.action;


import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.context.impl.TxInvocationContext; import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.interceptors.locking.ClusteringDependentLogic; import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.util.concurrent.locks.PendingLockListener; import org.infinispan.util.concurrent.locks.PendingLockListener;
Expand Down Expand Up @@ -62,14 +63,19 @@ protected ActionStatus init(ActionState state) {
final long timeout = state.getTimeout(); final long timeout = state.getTimeout();
final List<Object> keysToLock = getAndUpdateFilteredKeys(state); final List<Object> keysToLock = getAndUpdateFilteredKeys(state);


RemoteLockCommand command = state.getCommand();
if (command instanceof PrepareCommand && ((PrepareCommand) command).isRetriedCommand()) {
//clear the backup locks
context.getCacheTransaction().cleanupBackupLocks();
keysToLock.removeAll(context.getLockedKeys());
}

if (keysToLock.isEmpty()) { if (keysToLock.isEmpty()) {
//nothing to do. nobody else was able to update the state from checking, so no need to check the CAS //nothing to do. nobody else was able to update the state from checking, so no need to check the CAS
cas(InternalState.CHECKING, InternalState.READY); cas(InternalState.CHECKING, InternalState.READY);
return ActionStatus.READY; return ActionStatus.READY;
} }


keysToLock.forEach(context::addLockedKey);

PendingLockPromise promise = keysToLock.size() == 1 ? PendingLockPromise promise = keysToLock.size() == 1 ?
pendingLockManager.checkPendingTransactionsForKey(context, keysToLock.get(0), timeout, TimeUnit.MILLISECONDS) : pendingLockManager.checkPendingTransactionsForKey(context, keysToLock.get(0), timeout, TimeUnit.MILLISECONDS) :
pendingLockManager.checkPendingTransactionsForKeys(context, keysToLock, timeout, TimeUnit.MILLISECONDS); pendingLockManager.checkPendingTransactionsForKeys(context, keysToLock, timeout, TimeUnit.MILLISECONDS);
Expand Down
Expand Up @@ -7,7 +7,6 @@
import org.infinispan.commands.functional.ReadOnlyManyCommand; import org.infinispan.commands.functional.ReadOnlyManyCommand;
import org.infinispan.commands.functional.ReadWriteKeyCommand; import org.infinispan.commands.functional.ReadWriteKeyCommand;
import org.infinispan.commands.functional.ReadWriteKeyValueCommand; import org.infinispan.commands.functional.ReadWriteKeyValueCommand;
import org.infinispan.commands.functional.ReadWriteManyCommand;
import org.infinispan.commands.read.GetAllCommand; import org.infinispan.commands.read.GetAllCommand;
import org.infinispan.commands.tx.CommitCommand; import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand; import org.infinispan.commands.tx.PrepareCommand;
Expand Down Expand Up @@ -257,6 +256,9 @@ private Object handleTxCommand(TxInvocationContext ctx, TransactionBoundaryComma
// Only the originator can retry the command // Only the originator can retry the command
command.setTopologyId(retryTopologyId); command.setTopologyId(retryTopologyId);
waitForTransactionData(retryTopologyId); waitForTransactionData(retryTopologyId);
if (command instanceof PrepareCommand) {
((PrepareCommand) command).setRetriedCommand(true);
}


log.tracef("Retrying command %s for topology %d", command, retryTopologyId); log.tracef("Retrying command %s for topology %d", command, retryTopologyId);
localResult = handleTxCommand(ctx, command); localResult = handleTxCommand(ctx, command);
Expand Down
Expand Up @@ -13,21 +13,20 @@
import org.infinispan.context.Flag; import org.infinispan.context.Flag;
import org.infinispan.transaction.xa.CacheTransaction; import org.infinispan.transaction.xa.CacheTransaction;
import org.infinispan.transaction.xa.GlobalTransaction; import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.KeyValuePair;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log; import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory; import org.infinispan.util.logging.LogFactory;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import static org.infinispan.commons.util.Util.toStr; import static org.infinispan.commons.util.Util.toStr;
Expand Down Expand Up @@ -62,13 +61,11 @@ public abstract class AbstractCacheTransaction implements CacheTransaction {
/** Holds all the locks for which the local node is a secondary data owner. */ /** Holds all the locks for which the local node is a secondary data owner. */
protected volatile Set<Object> backupKeyLocks = null; protected volatile Set<Object> backupKeyLocks = null;


private volatile boolean txComplete = false;
protected final int topologyId; protected final int topologyId;


private EntryVersionsMap updatedEntryVersions; private EntryVersionsMap updatedEntryVersions;
private EntryVersionsMap versionsSeenMap; private EntryVersionsMap versionsSeenMap;

private EntryVersionsMap lookedUpRemoteVersions;
private Map<Object, EntryVersion> lookedUpRemoteVersions;


/** mark as volatile as this might be set from the tx thread code on view change*/ /** mark as volatile as this might be set from the tx thread code on view change*/
private volatile boolean isMarkedForRollback; private volatile boolean isMarkedForRollback;
Expand All @@ -87,7 +84,8 @@ public abstract class AbstractCacheTransaction implements CacheTransaction {


private volatile Flag stateTransferFlag; private volatile Flag stateTransferFlag;


private final CompletableFuture<Void> notifier; private final CompletableFuture<Void> txCompleted;
private volatile CompletableFuture<Void> backupLockReleased;


public final boolean isMarkedForRollback() { public final boolean isMarkedForRollback() {
return isMarkedForRollback; return isMarkedForRollback;
Expand All @@ -102,7 +100,8 @@ public AbstractCacheTransaction(GlobalTransaction tx, int topologyId, Equivalenc
this.topologyId = topologyId; this.topologyId = topologyId;
this.keyEquivalence = keyEquivalence; this.keyEquivalence = keyEquivalence;
this.txCreationTime = txCreationTime; this.txCreationTime = txCreationTime;
notifier = new CompletableFuture<>(); txCompleted = new CompletableFuture<>();
backupLockReleased = new CompletableFuture<>();
} }


@Override @Override
Expand Down Expand Up @@ -194,23 +193,15 @@ public boolean ownsLock(Object key) {
@Override @Override
public void notifyOnTransactionFinished() { public void notifyOnTransactionFinished() {
if (trace) log.tracef("Transaction %s has completed, notifying listening threads.", tx); if (trace) log.tracef("Transaction %s has completed, notifying listening threads.", tx);
if (!txComplete) { if (!txCompleted.isDone()) {
//avoid invalidate CPU L1 cache is tx is already completed txCompleted.complete(null);
txComplete = true; cleanupBackupLocks();
notifier.complete(null);
} }
} }


@Override @Override
public final boolean waitForLockRelease(long lockAcquisitionTimeout) throws InterruptedException { public final boolean waitForLockRelease(long lockAcquisitionTimeout) throws InterruptedException {
try { return CompletableFutures.await(txCompleted, lockAcquisitionTimeout, TimeUnit.MILLISECONDS);
notifier.get(lockAcquisitionTimeout, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw new IllegalStateException("Should never happen", e);
} catch (TimeoutException e) {
//ignored.
}
return txComplete;
} }


@Override @Override
Expand Down Expand Up @@ -268,7 +259,7 @@ public Object findAnyLockedOrBackupLocked(Collection<Object> keys) {


@Override @Override
public boolean areLocksReleased() { public boolean areLocksReleased() {
return txComplete; return txCompleted.isDone();
} }


public Set<Object> getAffectedKeys() { public Set<Object> getAffectedKeys() {
Expand Down Expand Up @@ -307,9 +298,9 @@ public EntryVersion getLookedUpRemoteVersion(Object key) {
@Override @Override
public void putLookedUpRemoteVersion(Object key, EntryVersion version) { public void putLookedUpRemoteVersion(Object key, EntryVersion version) {
if (lookedUpRemoteVersions == null) { if (lookedUpRemoteVersions == null) {
lookedUpRemoteVersions = new HashMap<>(); lookedUpRemoteVersions = new EntryVersionsMap();
} }
lookedUpRemoteVersions.put(key, version); lookedUpRemoteVersions.put(key, (IncrementableEntryVersion) version);
} }


@Override @Override
Expand Down Expand Up @@ -379,6 +370,42 @@ public long getCreationTime() {


@Override @Override
public final void addListener(TransactionCompletedListener listener) { public final void addListener(TransactionCompletedListener listener) {
notifier.thenRun(listener::onCompletion); txCompleted.thenRun(listener::onCompletion);
}

@Override
public CompletableFuture<Void> getReleaseFutureForKey(Object key) {
if (lockedKeys != null && lockedKeys.contains(key)) {
return txCompleted;
} else if (backupKeyLocks != null && backupKeyLocks.contains(key)) {
return backupLockReleased;
}
return null;
}

@Override
public KeyValuePair<Object, CompletableFuture<Void>> getReleaseFutureForKeys(Collection<Object> keys) {
Set<Object> locked = getLockedKeys();
Set<Object> backupLocked = getBackupLockedKeys();
Object backupKey = null;
for (Object key : keys) {
if (locked.contains(key)) {
return new KeyValuePair<>(key, txCompleted);
} else if (backupLocked.contains(key)) {
backupKey = key;
}
}
return backupKey == null ? null : new KeyValuePair<>(backupKey, backupLockReleased);
}

@Override
public void cleanupBackupLocks() {
if (backupKeyLocks != null) {
synchronized (backupKeyLocks) {
backupLockReleased.complete(null);
backupLockReleased = new CompletableFuture<>();
backupKeyLocks.clear();
}
}
} }
} }
Expand Up @@ -41,6 +41,7 @@ public class DummyTransaction implements Transaction {


private static final Log log = LogFactory.getLog(DummyTransaction.class); private static final Log log = LogFactory.getLog(DummyTransaction.class);
private static boolean trace = log.isTraceEnabled(); private static boolean trace = log.isTraceEnabled();
public static final String FORCE_ROLLBACK_MESSAGE = "Force rollback invoked. (debug mode)";
private final Xid xid; private final Xid xid;
private volatile int status = Status.STATUS_UNKNOWN; private volatile int status = Status.STATUS_UNKNOWN;
private final List<Synchronization> syncs; private final List<Synchronization> syncs;
Expand Down Expand Up @@ -102,7 +103,6 @@ public void commit() throws RollbackException, HeuristicMixedException, Heuristi
checkDone("Cannot commit transaction."); checkDone("Cannot commit transaction.");
runPrepare(); runPrepare();
runCommit(false); runCommit(false);
throwRollbackExceptionIfAny();
} }


/** /**
Expand All @@ -128,6 +128,11 @@ public void rollback() throws IllegalStateException, SystemException {
SystemException systemException = new SystemException("Unable to rollback transaction"); SystemException systemException = new SystemException("Unable to rollback transaction");
systemException.initCause(e); systemException.initCause(e);
throw systemException; throw systemException;
} catch (RollbackException e) {
//ignored
if (trace) {
log.trace("RollbackException thrown while rolling back", e);
}
} }
} }


Expand Down Expand Up @@ -282,12 +287,20 @@ public boolean runPrepare() {
return true; return true;
} }


public void runCommit(boolean forceRollback) throws HeuristicMixedException, HeuristicRollbackException { /**
* Runs the second phase of two-phase-commit protocol.
*
* If {@code forceRollback} is {@code true}, then a {@link RollbackException} is thrown with the message {@link #FORCE_ROLLBACK_MESSAGE}.
*
*
* @param forceRollback force the transaction to rollback.
*/
public void runCommit(boolean forceRollback) throws HeuristicMixedException, HeuristicRollbackException, RollbackException {
if (trace) { if (trace) {
log.tracef("runCommit(forceRollback=%b) invoked in transaction with Xid=%s", forceRollback, xid); log.tracef("runCommit(forceRollback=%b) invoked in transaction with Xid=%s", forceRollback, xid);
} }
if (forceRollback) { if (forceRollback) {
markRollbackOnly(new RollbackException("Force rollback invoked. (debug mode)")); markRollbackOnly(new RollbackException(FORCE_ROLLBACK_MESSAGE));
} }


int notifyAfterStatus = 0; int notifyAfterStatus = 0;
Expand All @@ -304,6 +317,7 @@ public void runCommit(boolean forceRollback) throws HeuristicMixedException, Heu
notifyAfterCompletion(notifyAfterStatus); notifyAfterCompletion(notifyAfterStatus);
DummyBaseTransactionManager.setTransaction(null); DummyBaseTransactionManager.setTransaction(null);
} }
throwRollbackExceptionIfAny(forceRollback);
} }


@Override @Override
Expand Down Expand Up @@ -339,8 +353,12 @@ public final boolean equals(Object obj) {
return this == obj; return this == obj;
} }


public final void throwRollbackExceptionIfAny() throws RollbackException { private void throwRollbackExceptionIfAny(boolean forceRollback) throws RollbackException {
if (firstRollbackException != null) { if (firstRollbackException != null) {
if (forceRollback && FORCE_ROLLBACK_MESSAGE.equals(firstRollbackException.getMessage())) {
//force rollback set. don't throw it.
return;
}
throw firstRollbackException; throw firstRollbackException;
} }
} }
Expand Down

0 comments on commit 0d37774

Please sign in to comment.