Skip to content

Commit

Permalink
ISPN-8494 Clear is leaking transaction with Batching
Browse files Browse the repository at this point in the history
  • Loading branch information
pruivo authored and danberindei committed Mar 28, 2018
1 parent 83c8761 commit 648100e
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 193 deletions.
14 changes: 7 additions & 7 deletions core/src/main/java/org/infinispan/cache/impl/CacheImpl.java
Expand Up @@ -590,7 +590,7 @@ final void removeGroup(String groupName, long explicitFlags) {
} }


private void transactionalRemoveGroup(String groupName, long explicitFlagsBitSet) { private void transactionalRemoveGroup(String groupName, long explicitFlagsBitSet) {
final boolean onGoingTransaction = getOngoingTransaction() != null; final boolean onGoingTransaction = getOngoingTransaction(true) != null;
if (!onGoingTransaction) { if (!onGoingTransaction) {
tryBegin(); tryBegin();
} }
Expand Down Expand Up @@ -945,7 +945,7 @@ InvocationContext getInvocationContextWithImplicitTransaction(boolean isPutForEx
boolean txInjected = false; boolean txInjected = false;
if (transactional) { if (transactional) {
if (!isPutForExternalRead) { if (!isPutForExternalRead) {
Transaction transaction = getOngoingTransaction(); Transaction transaction = getOngoingTransaction(true);
if (transaction == null && config.transaction().autoCommit()) { if (transaction == null && config.transaction().autoCommit()) {
transaction = tryBegin(); transaction = tryBegin();
txInjected = true; txInjected = true;
Expand Down Expand Up @@ -1694,12 +1694,12 @@ public AdvancedCache<K, V> withSubject(Subject subject) {
return this; // NO-OP return this; // NO-OP
} }


private Transaction getOngoingTransaction() { private Transaction getOngoingTransaction(boolean includeBatchTx) {
try { try {
Transaction transaction = null; Transaction transaction = null;
if (transactionManager != null) { if (transactionManager != null) {
transaction = transactionManager.getTransaction(); transaction = transactionManager.getTransaction();
if (transaction == null && batchingEnabled) { if (includeBatchTx && transaction == null && batchingEnabled) {
transaction = batchContainer.getBatchTransaction(); transaction = batchContainer.getBatchTransaction();
} }
} }
Expand Down Expand Up @@ -1780,7 +1780,7 @@ private Transaction tryBegin() {
} }
try { try {
transactionManager.begin(); transactionManager.begin();
final Transaction transaction = getOngoingTransaction(); final Transaction transaction = getOngoingTransaction(true);
if (trace) { if (trace) {
log.tracef("Implicit transaction started! Transaction: %s", transaction); log.tracef("Implicit transaction started! Transaction: %s", transaction);
} }
Expand All @@ -1805,7 +1805,7 @@ private void tryCommit() {
return; return;
} }
if (trace) if (trace)
log.tracef("Committing transaction as it was implicit: %s", getOngoingTransaction()); log.tracef("Committing transaction as it was implicit: %s", getOngoingTransaction(true));
try { try {
transactionManager.commit(); transactionManager.commit();
} catch (Throwable e) { } catch (Throwable e) {
Expand Down Expand Up @@ -1864,7 +1864,7 @@ public CompletableFuture<V> putAsync(K key, V value, Metadata metadata) {
} }


private Transaction suspendOngoingTransactionIfExists() { private Transaction suspendOngoingTransactionIfExists() {
final Transaction tx = getOngoingTransaction(); final Transaction tx = getOngoingTransaction(false);
if (tx != null) { if (tx != null) {
try { try {
transactionManager.suspend(); transactionManager.suspend();
Expand Down
Expand Up @@ -6,9 +6,12 @@


import org.infinispan.batch.BatchContainer; import org.infinispan.batch.BatchContainer;
import org.infinispan.commands.VisitableCommand; import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.EvictCommand; import org.infinispan.commands.write.EvictCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.context.InvocationContext; import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextFactory; import org.infinispan.context.InvocationContextFactory;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.DDAsyncInterceptor; import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.util.logging.Log; import org.infinispan.util.logging.Log;
Expand All @@ -28,11 +31,24 @@ public class BatchingInterceptor extends DDAsyncInterceptor {
private static final Log log = LogFactory.getLog(BatchingInterceptor.class); private static final Log log = LogFactory.getLog(BatchingInterceptor.class);


@Override @Override
public Object visitEvictCommand(InvocationContext ctx, EvictCommand command) throws Throwable { public Object visitEvictCommand(InvocationContext ctx, EvictCommand command) {
// eviction is non-tx, so this interceptor should be no-op for EvictCommands // eviction is non-tx, so this interceptor should be no-op for EvictCommands
return invokeNext(ctx, command); return invokeNext(ctx, command);
} }


@Override
public Object visitClearCommand(InvocationContext ctx, ClearCommand command) {
//clear is non transactional and it suspends all running tx before invocation. nothing to do here.
return invokeNext(ctx, command);
}

@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
return command.hasAnyFlag(FlagBitSets.PUT_FOR_EXTERNAL_READ) ?
invokeNext(ctx, command) :
handleDefault(ctx, command);
}

/** /**
* Simply check if there is an ongoing tx. <ul> <li>If there is one, this is a no-op and just passes the call up the * Simply check if there is an ongoing tx. <ul> <li>If there is one, this is a no-op and just passes the call up the
* chain.</li> <li>If there isn't one and there is a batch in progress, resume the batch's tx, pass up, and finally * chain.</li> <li>If there isn't one and there is a batch in progress, resume the batch's tx, pass up, and finally
Expand Down
90 changes: 73 additions & 17 deletions core/src/test/java/org/infinispan/api/batch/AbstractBatchTest.java
@@ -1,23 +1,79 @@
package org.infinispan.api.batch; package org.infinispan.api.batch;


import java.util.concurrent.atomic.AtomicReference; import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNull;

import java.lang.reflect.Method;
import java.util.concurrent.Future;

import javax.transaction.SystemException;
import javax.transaction.TransactionManager;


import org.infinispan.Cache; import org.infinispan.Cache;
import org.infinispan.test.AbstractInfinispanTest; import org.infinispan.manager.EmbeddedCacheManager;

import org.infinispan.test.SingleCacheManagerTest;
public abstract class AbstractBatchTest extends AbstractInfinispanTest { import org.infinispan.test.fwk.TestCacheManagerFactory;
protected String getOnDifferentThread(final Cache<String, String> cache, final String key) throws InterruptedException { import org.testng.annotations.Test;
final AtomicReference<String> ref = new AtomicReference<String>();
Thread t = new Thread() { @Test(groups = {"functional", "transaction", "smoke"})
public void run() { public abstract class AbstractBatchTest extends SingleCacheManagerTest {
cache.startBatch();
ref.set(cache.get(key)); @Override
cache.endBatch(true); public EmbeddedCacheManager createCacheManager() {
} return TestCacheManagerFactory.createCacheManager(false);
}; }


t.start(); public void testClearInBatch(Method method) {
t.join(); //tests if the clear doesn't leak the batch transaction.
return ref.get(); //if it does, the get() will be executed against a committed transaction and it will fail.
Cache<String, String> cache = createCache(method.getName());
cache.put("k2", "v2");

cache.startBatch();
cache.clear();
cache.put("k1", "v1");
cache.endBatch(true);

assertEquals(null, cache.get("k2"));
assertEquals("v1", cache.get("k1"));
} }

public void testPutForExternalReadInBatch(Method method) {
//tests if the putForExternalRead doesn't leak the batch transaction.
//if it does, the get() will be executed against a committed transaction and it will fail.
Cache<String, String> cache = createCache(method.getName());

cache.startBatch();
cache.putForExternalRead("k1", "v1");
cache.put("k2", "v2");
cache.endBatch(true);

assertEquals("v1", cache.get("k1"));
assertEquals("v2", cache.get("k2"));

cache.startBatch();
cache.putForExternalRead("k3", "v3");
cache.put("k1", "v2");
cache.endBatch(false);

assertEquals("v1", cache.get("k1"));
assertEquals("v2", cache.get("k2"));
assertEquals("v3", cache.get("k3"));
}

String getOnDifferentThread(final Cache<String, String> cache, final String key) throws Exception {
Future<String> f = fork(() -> {
cache.startBatch();
String v = cache.get(key);
cache.endBatch(true);
return v;
});
return f.get();
}

void assertNoTransaction(TransactionManager transactionManager) throws SystemException {
assertNull("Should have no ongoing txs", transactionManager.getTransaction());
}

protected abstract <K, V> Cache<K, V> createCache(String name);
} }
@@ -1,42 +1,28 @@
package org.infinispan.api.batch; package org.infinispan.api.batch;


import static org.infinispan.test.Exceptions.expectException;
import static org.infinispan.test.TestingUtil.getTransactionManager;
import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNull; import static org.testng.AssertJUnit.assertNull;


import java.lang.reflect.Method;

import javax.transaction.TransactionManager; import javax.transaction.TransactionManager;


import org.infinispan.Cache; import org.infinispan.Cache;
import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.transaction.TransactionMode; import org.infinispan.transaction.TransactionMode;
import org.infinispan.transaction.lookup.EmbeddedTransactionManagerLookup; import org.infinispan.transaction.lookup.EmbeddedTransactionManagerLookup;
import org.infinispan.transaction.tm.EmbeddedBaseTransactionManager; import org.infinispan.transaction.tm.EmbeddedBaseTransactionManager;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test; import org.testng.annotations.Test;




@Test(groups = {"functional", "transaction"}, testName = "api.batch.BatchWithCustomTMTest") @Test(groups = {"functional", "transaction"}, testName = "api.batch.BatchWithCustomTMTest")
public class BatchWithCustomTMTest extends AbstractBatchTest { public class BatchWithCustomTMTest extends AbstractBatchTest {


private EmbeddedCacheManager cm; public void testBatchWithOngoingTM(Method method) throws Exception {

Cache<String, String> cache = createCache(method.getName());
@BeforeClass TransactionManager tm = getTransactionManager(cache);
public void createCacheManager() {
cm = TestCacheManagerFactory.createCacheManager(false);
}

@AfterClass
public void destroyCacheManager() {
TestingUtil.killCacheManagers(cm);
cm = null;
}

public void testBatchWithOngoingTM() throws Exception {
Cache<String, String> cache =createCache("testBatchWithOngoingTM");
TransactionManager tm = TestingUtil.getTransactionManager(cache);
assertEquals(MyDummyTransactionManager.class, tm.getClass()); assertEquals(MyDummyTransactionManager.class, tm.getClass());
tm.begin(); tm.begin();
cache.put("k", "v"); cache.put("k", "v");
Expand All @@ -52,28 +38,23 @@ public void testBatchWithOngoingTM() throws Exception {
assertEquals("v2", cache.get("k2")); assertEquals("v2", cache.get("k2"));
} }


public void testBatchWithoutOngoingTMSuspension() throws Exception { public void testBatchWithoutOngoingTMSuspension(Method method) throws Exception {
Cache<String, String> cache = createCache("testBatchWithoutOngoingTMSuspension"); Cache<String, String> cache = createCache(method.getName());
TransactionManager tm = TestingUtil.getTransactionManager(cache); TransactionManager tm = getTransactionManager(cache);
assertEquals(MyDummyTransactionManager.class, tm.getClass()); assertEquals(MyDummyTransactionManager.class, tm.getClass());
assertNull("Should have no ongoing txs", tm.getTransaction()); assertNoTransaction(tm);
cache.startBatch(); cache.startBatch();


cache.put("k", "v"); cache.put("k", "v");
assertNull("Should have no ongoing txs", tm.getTransaction()); assertNoTransaction(tm);
cache.put("k2", "v2"); cache.put("k2", "v2");


assertNull(getOnDifferentThread(cache, "k")); assertNull(getOnDifferentThread(cache, "k"));
assertNull(getOnDifferentThread(cache, "k2")); assertNull(getOnDifferentThread(cache, "k2"));


try { expectException(IllegalStateException.class, tm::commit);
tm.commit(); // should have no effect
}
catch (Exception e) {
// the TM may barf here ... this is OK.
}


assertNull("Should have no ongoing txs", tm.getTransaction()); assertNoTransaction(tm);


assertNull(getOnDifferentThread(cache, "k")); assertNull(getOnDifferentThread(cache, "k"));
assertNull(getOnDifferentThread(cache, "k2")); assertNull(getOnDifferentThread(cache, "k2"));
Expand All @@ -84,8 +65,8 @@ public void testBatchWithoutOngoingTMSuspension() throws Exception {
assertEquals("v2", getOnDifferentThread(cache, "k2")); assertEquals("v2", getOnDifferentThread(cache, "k2"));
} }


public void testBatchRollback() throws Exception { public void testBatchRollback(Method method) throws Exception {
Cache<String, String> cache = createCache("testBatchRollback"); Cache<String, String> cache = createCache(method.getName());
cache.startBatch(); cache.startBatch();
cache.put("k", "v"); cache.put("k", "v");
cache.put("k2", "v2"); cache.put("k2", "v2");
Expand All @@ -99,20 +80,20 @@ public void testBatchRollback() throws Exception {
assertNull(getOnDifferentThread(cache, "k2")); assertNull(getOnDifferentThread(cache, "k2"));
} }


private Cache<String, String> createCache(String name) { protected <K, V> Cache<K, V> createCache(String name) {
ConfigurationBuilder c = new ConfigurationBuilder(); ConfigurationBuilder c = new ConfigurationBuilder();
c.transaction().transactionManagerLookup(new MyDummyTransactionManagerLookup()); c.transaction().transactionManagerLookup(new MyDummyTransactionManagerLookup());
c.invocationBatching().enable(); c.invocationBatching().enable();
c.transaction().transactionMode(TransactionMode.TRANSACTIONAL); c.transaction().transactionMode(TransactionMode.TRANSACTIONAL);
cm.defineConfiguration(name, c.build()); cacheManager.defineConfiguration(name, c.build());
return cm.getCache(name); return cacheManager.getCache(name);
} }


static class MyDummyTransactionManagerLookup extends EmbeddedTransactionManagerLookup { static class MyDummyTransactionManagerLookup extends EmbeddedTransactionManagerLookup {
MyDummyTransactionManager tm = new MyDummyTransactionManager(); MyDummyTransactionManager tm = new MyDummyTransactionManager();


@Override @Override
public TransactionManager getTransactionManager() throws Exception { public TransactionManager getTransactionManager() {
return tm; return tm;
} }
} }
Expand Down

0 comments on commit 648100e

Please sign in to comment.