Skip to content

Commit

Permalink
ISPN-777 Eager cluster wide locks not cleaned upon rollback
Browse files Browse the repository at this point in the history
* Increase the scope of the JIRA to handle any remote locks
* Deal with race condition around transaction validity check and lock being acquired
* Add more logging and better toString() implementations of classes
* Add a stress test to recreate this behaviour
  • Loading branch information
maniksurtani authored and galderz committed Dec 3, 2010
1 parent a919c50 commit 73821c6
Show file tree
Hide file tree
Showing 12 changed files with 322 additions and 89 deletions.
Expand Up @@ -232,16 +232,12 @@ public int hashCode() {

@Override
public String toString() {
return "LockControlCommand{" +
return "LockControlCommand {" +
"gtx=" + globalTx +
", cacheName='" + cacheName +
", implicit='" + implicit +
", keys=" + keys +
", unlock=" + unlock +
", singleKey=" + singleKey + '}';
}

public Address getOrigin() {
return globalTx.getAddress();
}
}
Expand Up @@ -77,19 +77,25 @@ public Object perform(InvocationContext ctx) throws Throwable {
markGtxAsRemote();
RemoteTransaction transaction = txTable.getRemoteTransaction(globalTx);
if (transaction == null) {
if (trace) log.info("Not found RemoteTransaction for tx id: " + globalTx);
if (trace) log.info("Did not find a RemoteTransaction for " + globalTx);
return null;
}
visitRemoteTransaction(transaction);
RemoteTxInvocationContext ctxt = icc.createRemoteTxInvocationContext();
ctxt.setRemoteTransaction(transaction);

try {
if (trace) log.trace("About to execute tx command :" + this);
if (trace) log.trace("About to execute tx command " + this);
return invoker.invoke(ctxt, this);
} finally {
txTable.removeRemoteTransaction(globalTx);
}
}

protected void visitRemoteTransaction(RemoteTransaction tx) {
// to be overridden
}

public Object[] getParameters() {
return new Object[]{globalTx, cacheName};
}
Expand Down Expand Up @@ -117,7 +123,7 @@ public int hashCode() {

@Override
public String toString() {
return ", gtx=" + globalTx +
return "gtx=" + globalTx +
", cacheName='" + cacheName + '\'' +
'}';
}
Expand Down
Expand Up @@ -56,6 +56,6 @@ public byte getCommandId() {

@Override
public String toString() {
return "CommitCommand{" + super.toString();
return "CommitCommand {" + super.toString();
}
}
Expand Up @@ -175,11 +175,11 @@ public PrepareCommand copy() {

@Override
public String toString() {
return "PrepareCommand{" +
return "PrepareCommand {" +
"gtx=" + globalTx +
", modifications=" + (modifications == null ? null : Arrays.asList(modifications)) +
", onePhaseCommit=" + onePhaseCommit +
"} " + super.toString();
", " + super.toString();
}

public boolean containsModificationType(Class<? extends ReplicableCommand> replicableCommandClass) {
Expand Down
Expand Up @@ -23,11 +23,13 @@

import org.infinispan.commands.Visitor;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.RemoteTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.marshall.Ids;
import org.infinispan.marshall.Marshallable;
import org.infinispan.marshall.exts.ReplicableCommandExternalizer;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.transaction.xa.RemoteTransaction;

/**
* Command corresponding to a transaction rollback.
Expand All @@ -50,12 +52,17 @@ public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throw
return visitor.visitRollbackCommand((TxInvocationContext) ctx, this);
}

@Override
public void visitRemoteTransaction(RemoteTransaction tx) {
tx.invalidate();
}

public byte getCommandId() {
return COMMAND_ID;
}

@Override
public String toString() {
return "RollbackCommand{ " + super.toString();
return "RollbackCommand {" + super.toString();
}
}
134 changes: 72 additions & 62 deletions core/src/main/java/org/infinispan/container/EntryFactoryImpl.java
Expand Up @@ -35,6 +35,7 @@
import org.infinispan.factories.annotations.Start;
import org.infinispan.marshall.MarshalledValue;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.transaction.xa.InvalidTransactionException;
import org.infinispan.util.Util;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -111,79 +112,88 @@ public MVCCEntry wrapEntryForWriting(InvocationContext ctx, InternalCacheEntry e
}

private MVCCEntry wrapEntryForWriting(InvocationContext ctx, Object key, InternalCacheEntry entry, boolean createIfAbsent, boolean forceLockIfAbsent, boolean alreadyLocked, boolean forRemoval, boolean undeleteIfNeeded) throws InterruptedException {
CacheEntry cacheEntry = ctx.lookupEntry(key);
MVCCEntry mvccEntry = null;
if (createIfAbsent && cacheEntry != null && cacheEntry.isNull()) cacheEntry = null;
if (cacheEntry != null) // exists in context! Just acquire lock if needed, and wrap.
{
if (trace) log.trace("Exists in context.");
// Acquire lock if needed. Add necessary check for skip locking in advance in order to avoid marshalled value issues
if (alreadyLocked || ctx.hasFlag(Flag.SKIP_LOCKING) || acquireLock(ctx, key)) {

if (cacheEntry instanceof MVCCEntry && (!forRemoval || !(cacheEntry instanceof NullMarkerEntry))) {
mvccEntry = (MVCCEntry) cacheEntry;
} else {
// this is a read-only entry that needs to be copied to a proper read-write entry!!
mvccEntry = createWrappedEntry(key, cacheEntry.getValue(), false, forRemoval, cacheEntry.getLifespan());
cacheEntry = mvccEntry;
ctx.putLookedUpEntry(key, cacheEntry);
try {
CacheEntry cacheEntry = ctx.lookupEntry(key);
MVCCEntry mvccEntry = null;
if (createIfAbsent && cacheEntry != null && cacheEntry.isNull()) cacheEntry = null;
if (cacheEntry != null) // exists in context! Just acquire lock if needed, and wrap.
{
if (trace) log.trace("Exists in context.");
// Acquire lock if needed. Add necessary check for skip locking in advance in order to avoid marshalled value issues
if (alreadyLocked || ctx.hasFlag(Flag.SKIP_LOCKING) || acquireLock(ctx, key)) {

if (cacheEntry instanceof MVCCEntry && (!forRemoval || !(cacheEntry instanceof NullMarkerEntry))) {
mvccEntry = (MVCCEntry) cacheEntry;
} else {
// this is a read-only entry that needs to be copied to a proper read-write entry!!
mvccEntry = createWrappedEntry(key, cacheEntry.getValue(), false, forRemoval, cacheEntry.getLifespan());
cacheEntry = mvccEntry;
ctx.putLookedUpEntry(key, cacheEntry);
}

// create a copy of the underlying entry
mvccEntry.copyForUpdate(container, writeSkewCheck);
} else if (ctx.hasFlag(Flag.FORCE_WRITE_LOCK)) {
// If lock was already held and force write lock is on, just wrap
if (cacheEntry instanceof MVCCEntry && (!forRemoval || !(cacheEntry instanceof NullMarkerEntry))) {
mvccEntry = (MVCCEntry) cacheEntry;
}
}

if (cacheEntry.isRemoved() && createIfAbsent && undeleteIfNeeded) {
if (trace) log.trace("Entry is deleted in current scope. Need to un-delete.");
if (mvccEntry != cacheEntry) mvccEntry = (MVCCEntry) cacheEntry;
mvccEntry.setRemoved(false);
mvccEntry.setValid(true);
}

// create a copy of the underlying entry
mvccEntry.copyForUpdate(container, writeSkewCheck);
} else if (ctx.hasFlag(Flag.FORCE_WRITE_LOCK)) {
// If lock was already held and force write lock is on, just wrap
if (cacheEntry instanceof MVCCEntry && (!forRemoval || !(cacheEntry instanceof NullMarkerEntry))) {
mvccEntry = (MVCCEntry) cacheEntry;
return mvccEntry;

} else {
boolean lockAcquired = false;
if (!alreadyLocked) {
lockAcquired = acquireLock(ctx, key);
}
// else, fetch from dataContainer or used passed entry.
cacheEntry = entry != null ? entry : container.get(key);
if (cacheEntry != null) {
if (trace) log.trace("Retrieved from container.");
// exists in cache! Just acquire lock if needed, and wrap.
// do we need a lock?
boolean needToCopy = alreadyLocked || lockAcquired || ctx.hasFlag(Flag.SKIP_LOCKING); // even if we do not acquire a lock, if skip-locking is enabled we should copy
mvccEntry = createWrappedEntry(key, cacheEntry.getValue(), false, false, cacheEntry.getLifespan());
ctx.putLookedUpEntry(key, mvccEntry);
if (needToCopy) mvccEntry.copyForUpdate(container, writeSkewCheck);
} else if (createIfAbsent) {
// this is the *only* point where new entries can be created!!
if (trace) log.trace("Creating new entry.");
// now to lock and create the entry. Lock first to prevent concurrent creation!
notifier.notifyCacheEntryCreated(key, true, ctx);
mvccEntry = createWrappedEntry(key, null, true, false, -1);
mvccEntry.setCreated(true);
ctx.putLookedUpEntry(key, mvccEntry);
mvccEntry.copyForUpdate(container, writeSkewCheck);
notifier.notifyCacheEntryCreated(key, false, ctx);
} else {
releaseLock(key);
}
}

if (cacheEntry.isRemoved() && createIfAbsent && undeleteIfNeeded) {
if (trace) log.trace("Entry is deleted in current scope. Need to un-delete.");
if (mvccEntry != cacheEntry) mvccEntry = (MVCCEntry) cacheEntry;
mvccEntry.setRemoved(false);
mvccEntry.setValid(true);
// see if we need to force the lock on nonexistent entries.
if (mvccEntry == null && forceLockIfAbsent) {
// make sure we record this! Null value since this is a forced lock on the key
if (acquireLock(ctx, key)) ctx.putLookedUpEntry(key, null);
}

return mvccEntry;

} else {
boolean lockAcquired = false;
if (!alreadyLocked) {
lockAcquired = acquireLock(ctx, key);
}
// else, fetch from dataContainer or used passed entry.
cacheEntry = entry != null ? entry : container.get(key);
if (cacheEntry != null) {
if (trace) log.trace("Retrieved from container.");
// exists in cache! Just acquire lock if needed, and wrap.
// do we need a lock?
boolean needToCopy = alreadyLocked || lockAcquired || ctx.hasFlag(Flag.SKIP_LOCKING); // even if we do not acquire a lock, if skip-locking is enabled we should copy
mvccEntry = createWrappedEntry(key, cacheEntry.getValue(), false, false, cacheEntry.getLifespan());
ctx.putLookedUpEntry(key, mvccEntry);
if (needToCopy) mvccEntry.copyForUpdate(container, writeSkewCheck);
} else if (createIfAbsent) {
// this is the *only* point where new entries can be created!!
if (trace) log.trace("Creating new entry.");
// now to lock and create the entry. Lock first to prevent concurrent creation!
notifier.notifyCacheEntryCreated(key, true, ctx);
mvccEntry = createWrappedEntry(key, null, true, false, -1);
mvccEntry.setCreated(true);
ctx.putLookedUpEntry(key, mvccEntry);
mvccEntry.copyForUpdate(container, writeSkewCheck);
notifier.notifyCacheEntryCreated(key, false, ctx);
} else {
} catch (InvalidTransactionException ite) {
try {
releaseLock(key);
} catch (Exception e) {
// may not be necessary?
}
throw ite;
}

// see if we need to force the lock on nonexistent entries.
if (mvccEntry == null && forceLockIfAbsent) {
// make sure we record this! Null value since this is a forced lock on the key
if (acquireLock(ctx, key)) ctx.putLookedUpEntry(key, null);
}

return mvccEntry;
}

/**
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.infinispan.CacheException;
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.tx.AbstractTransactionBoundaryCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
Expand All @@ -46,7 +47,7 @@
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.base.CommandInterceptor;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.ReversibleOrderedSet;
Expand All @@ -55,9 +56,11 @@
import org.infinispan.util.concurrent.locks.LockManager;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Interceptor to implement <a href="http://wiki.jboss.org/wiki/JBossCacheMVCC">MVCC</a> functionality.
Expand Down Expand Up @@ -113,9 +116,19 @@ public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand comm
}
}

private void abortIfRemoteTransactionInvalid(TxInvocationContext ctx, AbstractTransactionBoundaryCommand c) {
// this check fixes ISPN-777
if (!ctx.isOriginLocal()) {
Address origin = c.getGlobalTransaction().getAddress();
if (!transport.getMembers().contains(origin))
throw new CacheException("Member " + origin + " no longer in cluster. Forcing tx rollback for " + c.getGlobalTransaction());
}
}

@Override
public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
try {
abortIfRemoteTransactionInvalid(ctx, command);
return invokeNextInterceptor(ctx, command);
} catch (TimeoutException te) {
cleanupLocks(ctx, false);
Expand All @@ -142,13 +155,9 @@ public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand
public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand c) throws Throwable {
boolean localTxScope = ctx.isOriginLocal() && ctx.isInTxScope();
boolean shouldInvokeOnCluster = false;

try {

if (!ctx.isOriginLocal() && !transport.getMembers().contains(c.getOrigin())) { //ISPN-777
throw new CacheException("Member " + c.getOrigin() + " no longer in cluster. Forcing tx rollback for tx: " + c.getGlobalTransaction());
}

try {
abortIfRemoteTransactionInvalid(ctx, c);
if (localTxScope) {
c.attachGlobalTransaction((GlobalTransaction) ctx.getLockOwner());
}
Expand Down
@@ -0,0 +1,26 @@
package org.infinispan.transaction.xa;

import org.infinispan.CacheException;

/**
* Thrown if an operation is to be performed on an invalid transaction context.
*
* @author Manik Surtani
* @since 4.2
*/
public class InvalidTransactionException extends CacheException {
public InvalidTransactionException() {
}

public InvalidTransactionException(Throwable cause) {
super(cause);
}

public InvalidTransactionException(String msg) {
super(msg);
}

public InvalidTransactionException(String msg, Throwable cause) {
super(msg, cause);
}
}

0 comments on commit 73821c6

Please sign in to comment.