From 73821c6908c08cdd429150dec7266b4931f7470b Mon Sep 17 00:00:00 2001 From: Manik Surtani Date: Fri, 3 Dec 2010 14:00:12 +0000 Subject: [PATCH] ISPN-777 Eager cluster wide locks not cleaned upon rollback * Increase the scope of the JIRA to handle any remote locks * Deal with race condition around transaction validity check and lock being acquired * Add more logging and better toString() implementations of classes * Add a stress test to recreate this behaviour --- .../commands/control/LockControlCommand.java | 6 +- .../AbstractTransactionBoundaryCommand.java | 12 +- .../infinispan/commands/tx/CommitCommand.java | 2 +- .../commands/tx/PrepareCommand.java | 4 +- .../commands/tx/RollbackCommand.java | 9 +- .../container/EntryFactoryImpl.java | 134 ++++++++------- .../interceptors/LockingInterceptor.java | 23 ++- .../xa/InvalidTransactionException.java | 26 +++ .../transaction/xa/RemoteTransaction.java | 24 ++- .../transaction/xa/TransactionTable.java | 5 +- .../util/BidirectionalLinkedHashMap.java | 7 + .../tx/RemoteLockCleanupStressTest.java | 159 ++++++++++++++++++ 12 files changed, 322 insertions(+), 89 deletions(-) create mode 100644 core/src/main/java/org/infinispan/transaction/xa/InvalidTransactionException.java create mode 100644 core/src/test/java/org/infinispan/tx/RemoteLockCleanupStressTest.java diff --git a/core/src/main/java/org/infinispan/commands/control/LockControlCommand.java b/core/src/main/java/org/infinispan/commands/control/LockControlCommand.java index 2c7d8738a51b..140b7711e6e8 100644 --- a/core/src/main/java/org/infinispan/commands/control/LockControlCommand.java +++ b/core/src/main/java/org/infinispan/commands/control/LockControlCommand.java @@ -232,7 +232,7 @@ public int hashCode() { @Override public String toString() { - return "LockControlCommand{" + + return "LockControlCommand {" + "gtx=" + globalTx + ", cacheName='" + cacheName + ", implicit='" + implicit + @@ -240,8 +240,4 @@ public String toString() { ", unlock=" + unlock + ", singleKey=" + singleKey + '}'; } - - public Address getOrigin() { - return globalTx.getAddress(); - } } diff --git a/core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java b/core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java index 2fd84d0d85c9..721c87878404 100644 --- a/core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java +++ b/core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java @@ -77,19 +77,25 @@ public Object perform(InvocationContext ctx) throws Throwable { markGtxAsRemote(); RemoteTransaction transaction = txTable.getRemoteTransaction(globalTx); if (transaction == null) { - if (trace) log.info("Not found RemoteTransaction for tx id: " + globalTx); + if (trace) log.info("Did not find a RemoteTransaction for " + globalTx); return null; } + visitRemoteTransaction(transaction); RemoteTxInvocationContext ctxt = icc.createRemoteTxInvocationContext(); ctxt.setRemoteTransaction(transaction); + try { - if (trace) log.trace("About to execute tx command :" + this); + if (trace) log.trace("About to execute tx command " + this); return invoker.invoke(ctxt, this); } finally { txTable.removeRemoteTransaction(globalTx); } } + protected void visitRemoteTransaction(RemoteTransaction tx) { + // to be overridden + } + public Object[] getParameters() { return new Object[]{globalTx, cacheName}; } @@ -117,7 +123,7 @@ public int hashCode() { @Override public String toString() { - return ", gtx=" + globalTx + + return "gtx=" + globalTx + ", cacheName='" + cacheName + '\'' + '}'; } diff --git a/core/src/main/java/org/infinispan/commands/tx/CommitCommand.java b/core/src/main/java/org/infinispan/commands/tx/CommitCommand.java index 52812bceb2db..145579a38b90 100644 --- a/core/src/main/java/org/infinispan/commands/tx/CommitCommand.java +++ b/core/src/main/java/org/infinispan/commands/tx/CommitCommand.java @@ -56,6 +56,6 @@ public byte getCommandId() { @Override public String toString() { - return "CommitCommand{" + super.toString(); + return "CommitCommand {" + super.toString(); } } diff --git a/core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java b/core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java index 9da16d80e6a8..7d264a397c89 100644 --- a/core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java +++ b/core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java @@ -175,11 +175,11 @@ public PrepareCommand copy() { @Override public String toString() { - return "PrepareCommand{" + + return "PrepareCommand {" + "gtx=" + globalTx + ", modifications=" + (modifications == null ? null : Arrays.asList(modifications)) + ", onePhaseCommit=" + onePhaseCommit + - "} " + super.toString(); + ", " + super.toString(); } public boolean containsModificationType(Class replicableCommandClass) { diff --git a/core/src/main/java/org/infinispan/commands/tx/RollbackCommand.java b/core/src/main/java/org/infinispan/commands/tx/RollbackCommand.java index 69ac05e52dd3..c656f5417635 100644 --- a/core/src/main/java/org/infinispan/commands/tx/RollbackCommand.java +++ b/core/src/main/java/org/infinispan/commands/tx/RollbackCommand.java @@ -23,11 +23,13 @@ import org.infinispan.commands.Visitor; import org.infinispan.context.InvocationContext; +import org.infinispan.context.impl.RemoteTxInvocationContext; import org.infinispan.context.impl.TxInvocationContext; import org.infinispan.marshall.Ids; import org.infinispan.marshall.Marshallable; import org.infinispan.marshall.exts.ReplicableCommandExternalizer; import org.infinispan.transaction.xa.GlobalTransaction; +import org.infinispan.transaction.xa.RemoteTransaction; /** * Command corresponding to a transaction rollback. @@ -50,12 +52,17 @@ public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throw return visitor.visitRollbackCommand((TxInvocationContext) ctx, this); } + @Override + public void visitRemoteTransaction(RemoteTransaction tx) { + tx.invalidate(); + } + public byte getCommandId() { return COMMAND_ID; } @Override public String toString() { - return "RollbackCommand{ " + super.toString(); + return "RollbackCommand {" + super.toString(); } } diff --git a/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java b/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java index 96a996be1013..003880685df6 100644 --- a/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java +++ b/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java @@ -35,6 +35,7 @@ import org.infinispan.factories.annotations.Start; import org.infinispan.marshall.MarshalledValue; import org.infinispan.notifications.cachelistener.CacheNotifier; +import org.infinispan.transaction.xa.InvalidTransactionException; import org.infinispan.util.Util; import org.infinispan.util.concurrent.IsolationLevel; import org.infinispan.util.concurrent.TimeoutException; @@ -111,79 +112,88 @@ public MVCCEntry wrapEntryForWriting(InvocationContext ctx, InternalCacheEntry e } private MVCCEntry wrapEntryForWriting(InvocationContext ctx, Object key, InternalCacheEntry entry, boolean createIfAbsent, boolean forceLockIfAbsent, boolean alreadyLocked, boolean forRemoval, boolean undeleteIfNeeded) throws InterruptedException { - CacheEntry cacheEntry = ctx.lookupEntry(key); - MVCCEntry mvccEntry = null; - if (createIfAbsent && cacheEntry != null && cacheEntry.isNull()) cacheEntry = null; - if (cacheEntry != null) // exists in context! Just acquire lock if needed, and wrap. - { - if (trace) log.trace("Exists in context."); - // Acquire lock if needed. Add necessary check for skip locking in advance in order to avoid marshalled value issues - if (alreadyLocked || ctx.hasFlag(Flag.SKIP_LOCKING) || acquireLock(ctx, key)) { - - if (cacheEntry instanceof MVCCEntry && (!forRemoval || !(cacheEntry instanceof NullMarkerEntry))) { - mvccEntry = (MVCCEntry) cacheEntry; - } else { - // this is a read-only entry that needs to be copied to a proper read-write entry!! - mvccEntry = createWrappedEntry(key, cacheEntry.getValue(), false, forRemoval, cacheEntry.getLifespan()); - cacheEntry = mvccEntry; - ctx.putLookedUpEntry(key, cacheEntry); + try { + CacheEntry cacheEntry = ctx.lookupEntry(key); + MVCCEntry mvccEntry = null; + if (createIfAbsent && cacheEntry != null && cacheEntry.isNull()) cacheEntry = null; + if (cacheEntry != null) // exists in context! Just acquire lock if needed, and wrap. + { + if (trace) log.trace("Exists in context."); + // Acquire lock if needed. Add necessary check for skip locking in advance in order to avoid marshalled value issues + if (alreadyLocked || ctx.hasFlag(Flag.SKIP_LOCKING) || acquireLock(ctx, key)) { + + if (cacheEntry instanceof MVCCEntry && (!forRemoval || !(cacheEntry instanceof NullMarkerEntry))) { + mvccEntry = (MVCCEntry) cacheEntry; + } else { + // this is a read-only entry that needs to be copied to a proper read-write entry!! + mvccEntry = createWrappedEntry(key, cacheEntry.getValue(), false, forRemoval, cacheEntry.getLifespan()); + cacheEntry = mvccEntry; + ctx.putLookedUpEntry(key, cacheEntry); + } + + // create a copy of the underlying entry + mvccEntry.copyForUpdate(container, writeSkewCheck); + } else if (ctx.hasFlag(Flag.FORCE_WRITE_LOCK)) { + // If lock was already held and force write lock is on, just wrap + if (cacheEntry instanceof MVCCEntry && (!forRemoval || !(cacheEntry instanceof NullMarkerEntry))) { + mvccEntry = (MVCCEntry) cacheEntry; + } + } + + if (cacheEntry.isRemoved() && createIfAbsent && undeleteIfNeeded) { + if (trace) log.trace("Entry is deleted in current scope. Need to un-delete."); + if (mvccEntry != cacheEntry) mvccEntry = (MVCCEntry) cacheEntry; + mvccEntry.setRemoved(false); + mvccEntry.setValid(true); } - // create a copy of the underlying entry - mvccEntry.copyForUpdate(container, writeSkewCheck); - } else if (ctx.hasFlag(Flag.FORCE_WRITE_LOCK)) { - // If lock was already held and force write lock is on, just wrap - if (cacheEntry instanceof MVCCEntry && (!forRemoval || !(cacheEntry instanceof NullMarkerEntry))) { - mvccEntry = (MVCCEntry) cacheEntry; + return mvccEntry; + + } else { + boolean lockAcquired = false; + if (!alreadyLocked) { + lockAcquired = acquireLock(ctx, key); + } + // else, fetch from dataContainer or used passed entry. + cacheEntry = entry != null ? entry : container.get(key); + if (cacheEntry != null) { + if (trace) log.trace("Retrieved from container."); + // exists in cache! Just acquire lock if needed, and wrap. + // do we need a lock? + boolean needToCopy = alreadyLocked || lockAcquired || ctx.hasFlag(Flag.SKIP_LOCKING); // even if we do not acquire a lock, if skip-locking is enabled we should copy + mvccEntry = createWrappedEntry(key, cacheEntry.getValue(), false, false, cacheEntry.getLifespan()); + ctx.putLookedUpEntry(key, mvccEntry); + if (needToCopy) mvccEntry.copyForUpdate(container, writeSkewCheck); + } else if (createIfAbsent) { + // this is the *only* point where new entries can be created!! + if (trace) log.trace("Creating new entry."); + // now to lock and create the entry. Lock first to prevent concurrent creation! + notifier.notifyCacheEntryCreated(key, true, ctx); + mvccEntry = createWrappedEntry(key, null, true, false, -1); + mvccEntry.setCreated(true); + ctx.putLookedUpEntry(key, mvccEntry); + mvccEntry.copyForUpdate(container, writeSkewCheck); + notifier.notifyCacheEntryCreated(key, false, ctx); + } else { + releaseLock(key); } } - if (cacheEntry.isRemoved() && createIfAbsent && undeleteIfNeeded) { - if (trace) log.trace("Entry is deleted in current scope. Need to un-delete."); - if (mvccEntry != cacheEntry) mvccEntry = (MVCCEntry) cacheEntry; - mvccEntry.setRemoved(false); - mvccEntry.setValid(true); + // see if we need to force the lock on nonexistent entries. + if (mvccEntry == null && forceLockIfAbsent) { + // make sure we record this! Null value since this is a forced lock on the key + if (acquireLock(ctx, key)) ctx.putLookedUpEntry(key, null); } return mvccEntry; - - } else { - boolean lockAcquired = false; - if (!alreadyLocked) { - lockAcquired = acquireLock(ctx, key); - } - // else, fetch from dataContainer or used passed entry. - cacheEntry = entry != null ? entry : container.get(key); - if (cacheEntry != null) { - if (trace) log.trace("Retrieved from container."); - // exists in cache! Just acquire lock if needed, and wrap. - // do we need a lock? - boolean needToCopy = alreadyLocked || lockAcquired || ctx.hasFlag(Flag.SKIP_LOCKING); // even if we do not acquire a lock, if skip-locking is enabled we should copy - mvccEntry = createWrappedEntry(key, cacheEntry.getValue(), false, false, cacheEntry.getLifespan()); - ctx.putLookedUpEntry(key, mvccEntry); - if (needToCopy) mvccEntry.copyForUpdate(container, writeSkewCheck); - } else if (createIfAbsent) { - // this is the *only* point where new entries can be created!! - if (trace) log.trace("Creating new entry."); - // now to lock and create the entry. Lock first to prevent concurrent creation! - notifier.notifyCacheEntryCreated(key, true, ctx); - mvccEntry = createWrappedEntry(key, null, true, false, -1); - mvccEntry.setCreated(true); - ctx.putLookedUpEntry(key, mvccEntry); - mvccEntry.copyForUpdate(container, writeSkewCheck); - notifier.notifyCacheEntryCreated(key, false, ctx); - } else { + } catch (InvalidTransactionException ite) { + try { releaseLock(key); + } catch (Exception e) { + // may not be necessary? } + throw ite; } - - // see if we need to force the lock on nonexistent entries. - if (mvccEntry == null && forceLockIfAbsent) { - // make sure we record this! Null value since this is a forced lock on the key - if (acquireLock(ctx, key)) ctx.putLookedUpEntry(key, null); - } - - return mvccEntry; } /** diff --git a/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java b/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java index ecf045e9a2bc..9f71f52c656e 100644 --- a/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java @@ -24,6 +24,7 @@ import org.infinispan.CacheException; import org.infinispan.commands.control.LockControlCommand; import org.infinispan.commands.read.GetKeyValueCommand; +import org.infinispan.commands.tx.AbstractTransactionBoundaryCommand; import org.infinispan.commands.tx.CommitCommand; import org.infinispan.commands.tx.PrepareCommand; import org.infinispan.commands.tx.RollbackCommand; @@ -46,7 +47,7 @@ import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Start; import org.infinispan.interceptors.base.CommandInterceptor; -import org.infinispan.remoting.rpc.RpcManager; +import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.Transport; import org.infinispan.transaction.xa.GlobalTransaction; import org.infinispan.util.ReversibleOrderedSet; @@ -55,9 +56,11 @@ import org.infinispan.util.concurrent.locks.LockManager; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; /** * Interceptor to implement MVCC functionality. @@ -113,9 +116,19 @@ public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand comm } } + private void abortIfRemoteTransactionInvalid(TxInvocationContext ctx, AbstractTransactionBoundaryCommand c) { + // this check fixes ISPN-777 + if (!ctx.isOriginLocal()) { + Address origin = c.getGlobalTransaction().getAddress(); + if (!transport.getMembers().contains(origin)) + throw new CacheException("Member " + origin + " no longer in cluster. Forcing tx rollback for " + c.getGlobalTransaction()); + } + } + @Override public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable { try { + abortIfRemoteTransactionInvalid(ctx, command); return invokeNextInterceptor(ctx, command); } catch (TimeoutException te) { cleanupLocks(ctx, false); @@ -142,13 +155,9 @@ public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand public Object visitLockControlCommand(TxInvocationContext ctx, LockControlCommand c) throws Throwable { boolean localTxScope = ctx.isOriginLocal() && ctx.isInTxScope(); boolean shouldInvokeOnCluster = false; - - try { - - if (!ctx.isOriginLocal() && !transport.getMembers().contains(c.getOrigin())) { //ISPN-777 - throw new CacheException("Member " + c.getOrigin() + " no longer in cluster. Forcing tx rollback for tx: " + c.getGlobalTransaction()); - } + try { + abortIfRemoteTransactionInvalid(ctx, c); if (localTxScope) { c.attachGlobalTransaction((GlobalTransaction) ctx.getLockOwner()); } diff --git a/core/src/main/java/org/infinispan/transaction/xa/InvalidTransactionException.java b/core/src/main/java/org/infinispan/transaction/xa/InvalidTransactionException.java new file mode 100644 index 000000000000..8cb5a774bbd3 --- /dev/null +++ b/core/src/main/java/org/infinispan/transaction/xa/InvalidTransactionException.java @@ -0,0 +1,26 @@ +package org.infinispan.transaction.xa; + +import org.infinispan.CacheException; + +/** + * Thrown if an operation is to be performed on an invalid transaction context. + * + * @author Manik Surtani + * @since 4.2 + */ +public class InvalidTransactionException extends CacheException { + public InvalidTransactionException() { + } + + public InvalidTransactionException(Throwable cause) { + super(cause); + } + + public InvalidTransactionException(String msg) { + super(msg); + } + + public InvalidTransactionException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/core/src/main/java/org/infinispan/transaction/xa/RemoteTransaction.java b/core/src/main/java/org/infinispan/transaction/xa/RemoteTransaction.java index aea407241dca..6c7a669e8c14 100644 --- a/core/src/main/java/org/infinispan/transaction/xa/RemoteTransaction.java +++ b/core/src/main/java/org/infinispan/transaction/xa/RemoteTransaction.java @@ -31,9 +31,11 @@ public class RemoteTransaction implements CacheTransaction, Cloneable { private GlobalTransaction tx; + private volatile boolean valid = true; + public RemoteTransaction(WriteCommand[] modifications, GlobalTransaction tx) { - this.modifications = modifications == null || modifications.length == 0 ? Collections.emptyList(): Arrays.asList(modifications); + this.modifications = modifications == null || modifications.length == 0 ? Collections.emptyList() : Arrays.asList(modifications); lookedUpEntries = new BidirectionalLinkedHashMap(this.modifications.size()); this.tx = tx; } @@ -44,6 +46,10 @@ public RemoteTransaction(GlobalTransaction tx) { this.tx = tx; } + public void invalidate() { + valid = false; + } + public GlobalTransaction getGlobalTransaction() { return tx; } @@ -51,9 +57,9 @@ public GlobalTransaction getGlobalTransaction() { public List getModifications() { return modifications; } - - public void setModifications(WriteCommand[] modifications){ - this.modifications = Arrays.asList(modifications); + + public void setModifications(WriteCommand[] modifications) { + this.modifications = Arrays.asList(modifications); } public CacheEntry lookupEntry(Object key) { @@ -65,10 +71,14 @@ public BidirectionalMap getLookedUpEntries() { } public void putLookedUpEntry(Object key, CacheEntry e) { - if (log.isTraceEnabled()) { - log.trace("Adding key " + key + " to tx " + getGlobalTransaction()); + if (valid) { + if (log.isTraceEnabled()) { + log.trace("Adding key " + key + " to tx " + getGlobalTransaction()); + } + lookedUpEntries.put(key, e); + } else { + throw new InvalidTransactionException("This remote transaction " + getGlobalTransaction() + " is invalid"); } - lookedUpEntries.put(key, e); } public void removeLookedUpEntry(Object key) { diff --git a/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java b/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java index e87e1d15407f..11915ffcf810 100644 --- a/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java +++ b/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java @@ -131,12 +131,15 @@ public void run() { rc.init(invoker, icc, TransactionTable.this); try { rc.perform(null); + if (trace) log.trace("Rollback of {0} complete.", gtx); } catch (Throwable e) { log.warn("Unable to roll back gtx " + gtx, e); } finally { removeRemoteTransaction(gtx); } } + + if (trace) log.trace("Completed cleaning stale locks."); } }); } catch (RejectedExecutionException ree) { @@ -230,7 +233,7 @@ public boolean removeLocalTransaction(Transaction tx) { public boolean removeRemoteTransaction(GlobalTransaction txId) { boolean existed = remoteTransactions.remove(txId) != null; if (trace) { - log.trace("Removed " + txId + " from transaction table. Returning " + existed); + log.trace("Removed " + txId + " from transaction table. Transaction existed? " + existed); } return existed; } diff --git a/core/src/main/java/org/infinispan/util/BidirectionalLinkedHashMap.java b/core/src/main/java/org/infinispan/util/BidirectionalLinkedHashMap.java index b498e4359d32..8782745fcfd7 100644 --- a/core/src/main/java/org/infinispan/util/BidirectionalLinkedHashMap.java +++ b/core/src/main/java/org/infinispan/util/BidirectionalLinkedHashMap.java @@ -807,4 +807,11 @@ public BidirectionalLinkedHashMap clone() { return result; } + + @Override + public String toString() { + return "BidirectionalLinkedHashMap{" + + "size=" + size + + '}'; + } } diff --git a/core/src/test/java/org/infinispan/tx/RemoteLockCleanupStressTest.java b/core/src/test/java/org/infinispan/tx/RemoteLockCleanupStressTest.java new file mode 100644 index 000000000000..f3d7262771a9 --- /dev/null +++ b/core/src/test/java/org/infinispan/tx/RemoteLockCleanupStressTest.java @@ -0,0 +1,159 @@ +package org.infinispan.tx; + +import org.infinispan.Cache; +import org.infinispan.config.Configuration; +import org.infinispan.manager.EmbeddedCacheManager; +import org.infinispan.remoting.transport.Address; +import org.infinispan.test.MultipleCacheManagersTest; +import org.infinispan.test.TestingUtil; +import org.infinispan.test.fwk.CleanupAfterMethod; +import org.infinispan.test.fwk.TestCacheManagerFactory; +import org.infinispan.transaction.xa.GlobalTransaction; +import org.infinispan.util.concurrent.locks.LockManager; +import org.infinispan.util.logging.Log; +import org.infinispan.util.logging.LogFactory; +import org.testng.annotations.Test; + +import javax.transaction.Status; +import javax.transaction.TransactionManager; + +import static org.infinispan.test.TestingUtil.sleepThread; + +@Test (groups = "functional", testName = "lock.RemoteLockCleanupStressTest", invocationCount = 20) +@CleanupAfterMethod +public class RemoteLockCleanupStressTest extends MultipleCacheManagersTest { + + private static final Log log = LogFactory.getLog(RemoteLockCleanupStressTest.class); + + EmbeddedCacheManager cm1, cm2; + String key = "locked-counter"; + + @Override + protected void createCacheManagers() throws Throwable { + Configuration c = TestCacheManagerFactory.getDefaultConfiguration(true,Configuration.CacheMode.REPL_SYNC); + c.setFetchInMemoryState(true); + c.setLockAcquisitionTimeout(1500); + + cm1 = addClusterEnabledCacheManager(c); + cm2 = addClusterEnabledCacheManager(c); + } + + public void testLockRelease() { + + Thread t1 = new Thread(new CounterTask(cm1)); + Thread t2 = new Thread(new CounterTask(cm2)); + + t1.start(); + t2.start(); + + sleepThread(1000); + t2.interrupt(); + TestingUtil.killCacheManagers(cm2); + sleepThread(1100); + t1.interrupt(); + LockManager lm = TestingUtil.extractComponent(cm1.getCache(), LockManager.class); + Object owner = lm.getOwner(key); + assert ownerIsLocalOrUnlocked(owner, cm1.getAddress()) : "Bad lock owner " + owner; + } + + private boolean ownerIsLocalOrUnlocked(Object owner, Address self) { + if (owner == null) return true; + if (owner instanceof GlobalTransaction) { + GlobalTransaction gtx = ((GlobalTransaction) owner); + return gtx.getAddress().equals(self); + } else { + return false; + } + } + + class CounterTask implements Runnable { + EmbeddedCacheManager cm; + + CounterTask(EmbeddedCacheManager cm) { + this.cm = cm; + } + + @Override + public void run() { + for (int i=0; i<25; i++) run_(); + } + + public void run_() { + Cache cache = cm.getCache(); + TransactionManager tx = cache.getAdvancedCache().getTransactionManager(); + try { + tx.begin(); + } catch (Exception ex) { + log.debug("Exception starting transaction", ex); + } + + try { + log.debug("aquiring lock on cache " + cache.getName() + " key " + key + "..."); + cache.getAdvancedCache().lock(key); + + Integer val = (Integer) cache.get(key); + log.debug("current value : " + val); + if (val == null) { + val = 0; + } else { + val++; + + } + cache.put(key, val); + TestingUtil.sleepRandom(200); + + log.debug("commit..."); + tx.commit(); + log.debug("done commit"); + } catch (Exception ex) { + try { + log.debug("rollback... " + ex.getLocalizedMessage()); + tx.rollback(); + log.debug("done rollback"); + } catch (Exception rex) { + log.debug("Exception rolling back", rex); + } + } finally { + try { + log.debug("tx status at the end : "); + switch (tx.getStatus()) { + case Status.STATUS_ACTIVE: + log.debug("active"); + break; + case Status.STATUS_COMMITTED: + log.debug("committed"); + break; + case Status.STATUS_COMMITTING: + log.debug("committing"); + break; + case Status.STATUS_MARKED_ROLLBACK: + log.debug("makerd rollback"); + break; + case Status.STATUS_NO_TRANSACTION: + log.debug("no transaction"); + break; + case Status.STATUS_PREPARED: + log.debug("preprared"); + break; + case Status.STATUS_PREPARING: + log.debug("preparing"); + break; + case Status.STATUS_ROLLEDBACK: + log.debug("rolledback"); + break; + case Status.STATUS_ROLLING_BACK: + log.debug("rolling back"); + break; + case Status.STATUS_UNKNOWN: + log.debug("unknown"); + break; + default: + log.debug(tx.getStatus()); + } + } catch (Exception ex) { + log.debug("Exception retrieving transaction status", ex); + } + } + } + } +} \ No newline at end of file