Permalink
Browse files

ISPN-2962 Fix thread leaks in the core test suite

* Several tests do not shutdown their Executors or fail to stop an AsyncStore (notably AsyncStoreTest and AsyncStoreStressTest), leading to stray threads after the suite is run.
* DistributedExecutorTest and ReplSyncDistributedExecutorTest ocasionally swallow an interrupt preventing the pooled thread to stop when it is returned back to the pool and tries to take() the next task to execute.
* Print all the threads at the end of the test suite that were not already present at the beginning. These are leak suspects.
  • Loading branch information...
1 parent 77eec93 commit 4def486eebb92f2ef410ef5ffd5d227051464197 @anistor anistor committed with danberindei Mar 25, 2013
@@ -24,7 +24,6 @@
import org.infinispan.Cache;
import org.infinispan.distribution.BaseDistFunctionalTest;
-import org.infinispan.test.TestingUtil;
import org.infinispan.util.concurrent.NotifyingFuture;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
@@ -122,13 +122,17 @@ private void executeDifferentExecutionPolicies(DistributedTaskExecutionPolicy po
//initiate task from cache1 and select cache2 as target
DistributedExecutorService des = new DefaultExecutorService(cache1);
- //the same using DistributedTask API
- DistributedTaskBuilder<Boolean> taskBuilder = des.createDistributedTaskBuilder(new LocalDistributedExecutorTest.SimpleDistributedCallable(true));
- taskBuilder.executionPolicy(policy);
+ try {
+ //the same using DistributedTask API
+ DistributedTaskBuilder<Boolean> taskBuilder = des.createDistributedTaskBuilder(new LocalDistributedExecutorTest.SimpleDistributedCallable(true));
+ taskBuilder.executionPolicy(policy);
- DistributedTask<Boolean> distributedTask = taskBuilder.build();
- Future<Boolean> future = des.submit(distributedTask, new String[] {"key1", "key6"});
+ DistributedTask<Boolean> distributedTask = taskBuilder.build();
+ Future<Boolean> future = des.submit(distributedTask, new String[] {"key1", "key6"});
- assert future.get();
+ assert future.get();
+ } finally {
+ des.shutdownNow();
+ }
}
}
@@ -51,7 +51,7 @@
@Test(groups = "functional", testName = "distexec.DistributedExecutorTest")
public class DistributedExecutorTest extends LocalDistributedExecutorTest {
- public static AtomicInteger counter = new AtomicInteger();
+ private static AtomicInteger counter = new AtomicInteger();
protected boolean supportsConcurrentUpdates = true;
@@ -360,6 +360,7 @@ public Integer call() throws Exception {
try {
latch.await(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
//interrupted successfully, increase counter
counter.incrementAndGet();
}
@@ -51,7 +51,7 @@ protected void createCacheManagers() throws Throwable {
@AfterMethod
public void shutDownDistributedExecutorService() {
if (cleanupService != null) {
- cleanupService.shutdown();
+ cleanupService.shutdownNow();
} else {
log.warn("Should have shutdown DistributedExecutorService but none was set");
}
@@ -44,7 +44,7 @@
@Test(groups = "functional", testName = "distexec.ReplSyncDistributedExecutorTest")
public class ReplSyncDistributedExecutorTest extends DistributedExecutorTest {
- public static AtomicInteger ReplSyncDistributedExecutorTestCancelCounter = new AtomicInteger();
+ private static AtomicInteger ReplSyncDistributedExecutorTestCancelCounter = new AtomicInteger();
public ReplSyncDistributedExecutorTest() {
cleanup = CleanupPhase.AFTER_METHOD;
@@ -82,7 +82,7 @@ public boolean isSatisfied() throws Exception {
boolean taskCancelled = false;
try {
future.get();
- } catch (Exception e) {
+ } catch (Exception e) {
taskCancelled = e instanceof CancellationException;
}
assert taskCancelled : "Dist task not cancelled";
@@ -94,6 +94,7 @@ public boolean isSatisfied() throws Exception {
boolean canceled = future.cancel(true);
assert !canceled;
}
+
static class MyLongRunningCallable implements Callable<Integer>, Serializable {
/** The serialVersionUID */
@@ -106,6 +107,7 @@ public Integer call() throws Exception {
try {
latch.await(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
ReplSyncDistributedExecutorTestCancelCounter.incrementAndGet();
}
return 1;
@@ -36,8 +36,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -84,11 +82,9 @@ public void testInMemoryEntryNotLostWithConcurrentActivePassive() throws Excepti
assertEquals(0, activation.getActivationCount());
assertEquals(0, passivation.getPassivationCount());
- ExecutorService exec = Executors.newFixedThreadPool(2);
-
// 2. Add another entry and block just after passivation has stored
// entry in cache store, but it's still in memory
- Future<Object> passivatorFuture = exec.submit(new Callable<Object>() {
+ Future<Object> passivatorFuture = fork(new Callable<Object>() {
@Override
public Object call() throws Exception {
// Store a second entry to force the previous entry
@@ -101,7 +97,7 @@ public Object call() throws Exception {
// 3. Retrieve entry to be passivated from memory
// and let it remove it from cache store
- Future<Object> activatorFuture = exec.submit(new Callable<Object>() {
+ Future<Object> activatorFuture = fork(new Callable<Object>() {
@Override
public Object call() throws Exception {
// Retrieve first key forcing an activation
@@ -67,21 +67,15 @@
@Test(groups = "unit", testName = "loaders.decorators.AsyncStoreTest", sequential=true)
public class AsyncStoreTest extends AbstractInfinispanTest {
private static final Log log = LogFactory.getLog(AsyncStoreTest.class);
- AsyncStore store;
- ExecutorService asyncExecutor;
- DummyInMemoryCacheStore underlying;
- AsyncStoreConfig asyncConfig;
- DummyInMemoryCacheStore.Cfg dummyCfg;
-
- @BeforeMethod
- public void setUp() throws CacheLoaderException {
- underlying = new DummyInMemoryCacheStore();
- asyncConfig = new AsyncStoreConfig().threadPoolSize(10);
+ private AsyncStore store;
+
+ private void createStore() throws CacheLoaderException {
+ DummyInMemoryCacheStore underlying = new DummyInMemoryCacheStore();
+ AsyncStoreConfig asyncConfig = new AsyncStoreConfig().threadPoolSize(10);
store = new AsyncStore(underlying, asyncConfig);
- dummyCfg = new DummyInMemoryCacheStore.Cfg().storeName(AsyncStoreTest.class.getName());
+ DummyInMemoryCacheStore.Cfg dummyCfg = new DummyInMemoryCacheStore.Cfg().storeName(AsyncStoreTest.class.getName());
store.init(dummyCfg, null, null);
store.start();
- asyncExecutor = (ExecutorService) TestingUtil.extractField(store, "executor");
}
@AfterMethod(alwaysRun = true)
@@ -92,6 +86,8 @@ public void tearDown() throws CacheLoaderException {
@Test(timeOut=10000)
public void testPutRemove() throws Exception {
TestCacheManagerFactory.backgroundTestStarted(this);
+ createStore();
+
final int number = 1000;
String key = "testPutRemove-k-";
String value = "testPutRemove-v-";
@@ -102,6 +98,8 @@ public void testPutRemove() throws Exception {
@Test(timeOut=10000)
public void testPutClearPut() throws Exception {
TestCacheManagerFactory.backgroundTestStarted(this);
+ createStore();
+
final int number = 1000;
String key = "testPutClearPut-k-";
String value = "testPutClearPut-v-";
@@ -115,6 +113,8 @@ public void testPutClearPut() throws Exception {
@Test(timeOut=10000)
public void testMultiplePutsOnSameKey() throws Exception {
TestCacheManagerFactory.backgroundTestStarted(this);
+ createStore();
+
final int number = 1000;
String key = "testMultiplePutsOnSameKey-k";
String value = "testMultiplePutsOnSameKey-v-";
@@ -125,6 +125,8 @@ public void testMultiplePutsOnSameKey() throws Exception {
@Test(timeOut=10000)
public void testRestrictionOnAddingToAsyncQueue() throws Exception {
TestCacheManagerFactory.backgroundTestStarted(this);
+ createStore();
+
store.remove("blah");
final int number = 10;
@@ -153,9 +155,10 @@ public void testThreadSafetyWritingDiffValuesForKey(Method m) throws Exception {
final CountDownLatch v2Latch = new CountDownLatch(1);
final CountDownLatch endLatch = new CountDownLatch(1);
DummyInMemoryCacheStore underlying = new DummyInMemoryCacheStore();
+ AsyncStoreConfig asyncConfig = new AsyncStoreConfig().threadPoolSize(10);
store = new MockAsyncStore(key, v1Latch, v2Latch, endLatch, underlying, asyncConfig);
- dummyCfg = new DummyInMemoryCacheStore.Cfg();
- dummyCfg.setStoreName(m.getName());
+ DummyInMemoryCacheStore.Cfg dummyCfg = new DummyInMemoryCacheStore.Cfg();
+ dummyCfg.storeName(m.getName());
store.init(dummyCfg, null, null);
store.start();
@@ -182,6 +185,7 @@ public void testTransactionalModificationsHappenInDiffThread(Method m) throws Ex
final ConcurrentMap<Object, Modification> localMods = new ConcurrentHashMap<Object, Modification>();
final CyclicBarrier barrier = new CyclicBarrier(2);
DummyInMemoryCacheStore underlying = new DummyInMemoryCacheStore();
+ AsyncStoreConfig asyncConfig = new AsyncStoreConfig().threadPoolSize(10);
store = new AsyncStore(underlying, asyncConfig) {
@Override
protected void applyModificationsSync(List<Modification> mods) throws CacheLoaderException {
@@ -209,8 +213,8 @@ private Object getKey(Modification modification) {
}
}
};
- dummyCfg = new DummyInMemoryCacheStore.Cfg();
- dummyCfg.setStoreName(m.getName());
+ DummyInMemoryCacheStore.Cfg dummyCfg = new DummyInMemoryCacheStore.Cfg();
+ dummyCfg.storeName(m.getName());
store.init(dummyCfg, null, null);
store.start();
@@ -270,6 +274,7 @@ public void clear() {
clearCount.getAndIncrement();
}
};
+ AsyncStoreConfig asyncConfig = new AsyncStoreConfig().threadPoolSize(10);
store = new AsyncStore(underlying, asyncConfig) {
@Override
protected void applyModificationsSync(List<Modification> mods)
@@ -285,8 +290,8 @@ protected void applyModificationsSync(List<Modification> mods)
}
}
};
- dummyCfg = new DummyInMemoryCacheStore.Cfg();
- dummyCfg.setStoreName(m.getName());
+ DummyInMemoryCacheStore.Cfg dummyCfg = new DummyInMemoryCacheStore.Cfg();
+ dummyCfg.storeName(m.getName());
store.init(dummyCfg, null, null);
store.start();
@@ -555,6 +560,7 @@ public boolean remove(Object key) {
public void testModificationQueueSize(final Method m) throws Exception {
LockableCacheStore underlying = new LockableCacheStore();
+ AsyncStoreConfig asyncConfig = new AsyncStoreConfig().threadPoolSize(10);
asyncConfig.modificationQueueSize(10);
store = new AsyncStore(underlying, asyncConfig);
store.init(new LockableCacheStoreConfig(), null, null);
@@ -58,6 +58,7 @@
import org.infinispan.util.logging.LogFactory;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
import java.util.ArrayList;
@@ -89,6 +90,15 @@
private static final Log log = LogFactory.getLog(StateConsumerTest.class);
+ private ExecutorService pooledExecutorService;
+
+ @AfterMethod
+ public void tearDown() {
+ if (pooledExecutorService != null) {
+ pooledExecutorService.shutdownNow();
+ }
+ }
+
public void test1() throws Exception {
// create cache configuration
ConfigurationBuilder cb = new ConfigurationBuilder();
@@ -131,7 +141,7 @@ public Thread newThread(Runnable r) {
}
};
- ExecutorService pooledExecutorService = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS,
+ pooledExecutorService = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>(), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
StateTransferManager stateTransferManager = mock(StateTransferManager.class);
@@ -33,28 +33,22 @@
import org.infinispan.loaders.decorators.AsyncStoreConfig;
import org.infinispan.loaders.dummy.DummyInMemoryCacheStore;
import org.infinispan.marshall.TestObjectStreamMarshaller;
-import org.infinispan.test.TestingUtil;
import org.infinispan.util.concurrent.locks.containers.LockContainer;
import org.infinispan.util.concurrent.locks.containers.ReentrantPerEntryLockContainer;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeMap;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import static java.lang.Math.sqrt;
+import static org.junit.Assert.assertTrue;
/**
* Async store stress test.
@@ -124,7 +118,7 @@ private DummyInMemoryCacheStore createBackendStore(String storeName) throws Cach
};
}
- @Test(dataProvider = "readWriteRemove", enabled = true)
+ @Test(dataProvider = "readWriteRemove")
public void testReadWriteRemove(int capacity, int numKeys,
int readerThreads, int writerThreads, int removerThreads) throws Exception {
System.out.printf("Testing independent read/write/remove performance " +
@@ -134,12 +128,24 @@ public void testReadWriteRemove(int capacity, int numKeys,
generateKeyList(numKeys);
Map<String, AbstractDelegatingStore> stores = createAsyncStores();
-
- for (Map.Entry<String, AbstractDelegatingStore> e : stores.entrySet()) {
- mapTestReadWriteRemove(e.getKey(), e.getValue(), numKeys,
- readerThreads, writerThreads, removerThreads);
- e.setValue(null);
+ try {
+ for (Map.Entry<String, AbstractDelegatingStore> e : stores.entrySet()) {
+ mapTestReadWriteRemove(e.getKey(), e.getValue(), numKeys,
+ readerThreads, writerThreads, removerThreads);
+ e.setValue(null);
+ }
+ } finally {
+ for (Iterator<AbstractDelegatingStore> it = stores.values().iterator(); it.hasNext(); ) {
+ AbstractDelegatingStore store = it.next();
+ try {
+ store.stop();
+ it.remove();
+ } catch (Exception ex) {
+ log.error("Failed to stop cache store", ex);
+ }
+ }
}
+ assertTrue("Not all stores were properly shut down", stores.isEmpty());
}
private void mapTestReadWriteRemove(String name, AbstractDelegatingStore store,
@@ -73,11 +73,11 @@ private void checkCleanedUp(ITestContext testCxt) {
}
public static void describeErrorsIfAny() {
- if ( ! failedTestDescriptions.isEmpty() ) {
+ if (!failedTestDescriptions.isEmpty()) {
log("~~~~~~~~~~~~~~~~~~~~~~~~~ TEST HEALTH INFO ~~~~~~~~~~~~~~~~~~~~~~~~~~");
log("Some tests didn't properly shutdown the CacheManager:");
for (String errorMsg : failedTestDescriptions) {
- System.out.println( "\t" + errorMsg);
+ log("\t" + errorMsg);
}
log("~~~~~~~~~~~~~~~~~~~~~~~~~ TEST HEALTH INFO ~~~~~~~~~~~~~~~~~~~~~~~~~~");
}
Oops, something went wrong.

0 comments on commit 4def486

Please sign in to comment.