Skip to content

Commit

Permalink
ISPN-7598 Clean up tests after fixing thread leaks
Browse files Browse the repository at this point in the history
* Use the diamond operator and lambdas where possible
* Use AssertJUnit.assertEquals instead of the assert keyword
* Use Exceptions.expectException instead of @test(expectedExceptions)
* Use TestException instead of ArithmeticException
* Remove the unstable group from ReplSyncDistributedExecutorTest.
  testBasicTargetRemoteDistributedCallableWithHighFutureAndLowTaskTimeout
  • Loading branch information
danberindei authored and tristantarrant committed Apr 28, 2017
1 parent daf9dc1 commit ca9045d
Show file tree
Hide file tree
Showing 22 changed files with 386 additions and 480 deletions.
@@ -1,5 +1,7 @@
package org.infinispan.distexec; package org.infinispan.distexec;


import static org.infinispan.test.Exceptions.expectException;

import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
Expand All @@ -16,6 +18,7 @@
import org.infinispan.marshall.core.ExternalPojo; import org.infinispan.marshall.core.ExternalPojo;
import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.Address;
import org.infinispan.test.AbstractCacheTest; import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.TestException;
import org.infinispan.test.TestingUtil; import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory; import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.concurrent.WithinThreadExecutor; import org.infinispan.util.concurrent.WithinThreadExecutor;
Expand All @@ -34,28 +37,24 @@ public class BasicDistributedExecutorTest extends AbstractCacheTest {
public BasicDistributedExecutorTest() { public BasicDistributedExecutorTest() {
} }


@Test(expectedExceptions = { IllegalArgumentException.class })
public void testImproperMasterCacheForDistributedExecutor() { public void testImproperMasterCacheForDistributedExecutor() {
DistributedExecutorService des = new DefaultExecutorService(null); expectException(IllegalArgumentException.class, () -> new DefaultExecutorService(null));

} }


@Test(expectedExceptions = { IllegalArgumentException.class })
public void testImproperLocalExecutorServiceForDistributedExecutor() { public void testImproperLocalExecutorServiceForDistributedExecutor() {
EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createCacheManager(false); EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createCacheManager(false);
try { try {
Cache<Object, Object> cache = cacheManager.getCache(); Cache<Object, Object> cache = cacheManager.getCache();
DistributedExecutorService des = new DefaultExecutorService(cache, null); expectException(IllegalArgumentException.class, () -> new DefaultExecutorService(cache, null));
} finally { } finally {
TestingUtil.killCacheManagers(cacheManager); TestingUtil.killCacheManagers(cacheManager);
} }
} }


@Test(expectedExceptions = { IllegalArgumentException.class })
public void testStoppedLocalExecutorServiceForDistributedExecutor() throws ExecutionException, InterruptedException { public void testStoppedLocalExecutorServiceForDistributedExecutor() throws ExecutionException, InterruptedException {
ExecutorService service = new WithinThreadExecutor(); ExecutorService service = new WithinThreadExecutor();
service.shutdown(); service.shutdown();
customExecutorServiceDistributedExecutorTest(service, false); expectException(IllegalArgumentException.class, () -> customExecutorServiceDistributedExecutorTest(service, false));
} }


public void testDistributedExecutorWithPassedThreadExecutorOwnership() throws ExecutionException, InterruptedException { public void testDistributedExecutorWithPassedThreadExecutorOwnership() throws ExecutionException, InterruptedException {
Expand All @@ -73,20 +72,19 @@ public void testDistributedExecutorWithManagedExecutorService() throws Execution
customExecutorServiceDistributedExecutorTest(service, false); customExecutorServiceDistributedExecutorTest(service, false);
} }


@Test(expectedExceptions = { IllegalArgumentException.class })
public void testDistributedExecutorWithManagedExecutorServiceOwnership() throws ExecutionException, InterruptedException { public void testDistributedExecutorWithManagedExecutorServiceOwnership() throws ExecutionException, InterruptedException {
ExecutorService service = new ManagedExecutorServicesEmulator(); ExecutorService service = new ManagedExecutorServicesEmulator();
customExecutorServiceDistributedExecutorTest(service, true); expectException(IllegalArgumentException.class, () -> customExecutorServiceDistributedExecutorTest(service, true));
} }


private void customExecutorServiceDistributedExecutorTest(ExecutorService service, boolean overrideOwnership) throws ExecutionException, InterruptedException { private void customExecutorServiceDistributedExecutorTest(ExecutorService service, boolean overrideOwnership) throws ExecutionException, InterruptedException {
ConfigurationBuilder config = TestCacheManagerFactory.getDefaultCacheConfiguration(true); ConfigurationBuilder config = TestCacheManagerFactory.getDefaultCacheConfiguration(true);
config.clustering().cacheMode(CacheMode.REPL_SYNC); config.clustering().cacheMode(CacheMode.REPL_SYNC);
EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createClusteredCacheManager(config); EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createClusteredCacheManager(config);
DistributedExecutorService des = null;
try { try {
Cache<Object, Object> cache = cacheManager.getCache(); Cache<Object, Object> cache = cacheManager.getCache();


DistributedExecutorService des;
if (overrideOwnership) if (overrideOwnership)
des = new DefaultExecutorService(cache, service, true); des = new DefaultExecutorService(cache, service, true);
else else
Expand All @@ -100,15 +98,14 @@ private void customExecutorServiceDistributedExecutorTest(ExecutorService servic
} }
} }


@Test(expectedExceptions = { IllegalStateException.class })
public void testStoppedCacheForDistributedExecutor() { public void testStoppedCacheForDistributedExecutor() {
ConfigurationBuilder config = TestCacheManagerFactory.getDefaultCacheConfiguration(true); ConfigurationBuilder config = TestCacheManagerFactory.getDefaultCacheConfiguration(true);
config.clustering().cacheMode(CacheMode.REPL_SYNC); config.clustering().cacheMode(CacheMode.REPL_SYNC);
EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createClusteredCacheManager(config); EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createClusteredCacheManager(config);
try { try {
Cache<Object, Object> cache = cacheManager.getCache(); Cache<Object, Object> cache = cacheManager.getCache();
cache.stop(); cache.stop();
DistributedExecutorService des = new DefaultExecutorService(cache); expectException(IllegalStateException.class, () -> new DefaultExecutorService(cache));
} finally { } finally {
TestingUtil.killCacheManagers(cacheManager); TestingUtil.killCacheManagers(cacheManager);
} }
Expand All @@ -118,10 +115,9 @@ public void testDistributedExecutorShutDown() {
ConfigurationBuilder config = TestCacheManagerFactory.getDefaultCacheConfiguration(true); ConfigurationBuilder config = TestCacheManagerFactory.getDefaultCacheConfiguration(true);
config.clustering().cacheMode(CacheMode.REPL_SYNC); config.clustering().cacheMode(CacheMode.REPL_SYNC);
EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createClusteredCacheManager(config); EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createClusteredCacheManager(config);
DistributedExecutorService des = null;
try { try {
Cache<Object, Object> cache = cacheManager.getCache(); Cache<Object, Object> cache = cacheManager.getCache();
des = new DefaultExecutorService(cache); DistributedExecutorService des = new DefaultExecutorService(cache);
des.shutdown(); des.shutdown();
assert des.isShutdown(); assert des.isShutdown();
assert des.isTerminated(); assert des.isTerminated();
Expand All @@ -134,13 +130,11 @@ public void testDistributedExecutorRealShutdown() {
ConfigurationBuilder config = TestCacheManagerFactory.getDefaultCacheConfiguration(true); ConfigurationBuilder config = TestCacheManagerFactory.getDefaultCacheConfiguration(true);
config.clustering().cacheMode(CacheMode.REPL_SYNC); config.clustering().cacheMode(CacheMode.REPL_SYNC);
EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createClusteredCacheManager(config); EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createClusteredCacheManager(config);
DistributedExecutorService des = null; ExecutorService service = new WithinThreadExecutor();
ExecutorService service = null;
try { try {
Cache<Object, Object> cache = cacheManager.getCache(); Cache<Object, Object> cache = cacheManager.getCache();
service = new WithinThreadExecutor();


des = new DefaultExecutorService(cache, service); DistributedExecutorService des = new DefaultExecutorService(cache, service);


des.shutdown(); des.shutdown();


Expand All @@ -157,13 +151,11 @@ public void testDistributedExecutorRealShutdownWithOwnership() {
ConfigurationBuilder config = TestCacheManagerFactory.getDefaultCacheConfiguration(true); ConfigurationBuilder config = TestCacheManagerFactory.getDefaultCacheConfiguration(true);
config.clustering().cacheMode(CacheMode.REPL_SYNC); config.clustering().cacheMode(CacheMode.REPL_SYNC);
EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createClusteredCacheManager(config); EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createClusteredCacheManager(config);
DistributedExecutorService des = null; ExecutorService service = new WithinThreadExecutor();
ExecutorService service = null;
try { try {
Cache<Object, Object> cache = cacheManager.getCache(); Cache<Object, Object> cache = cacheManager.getCache();
service = new WithinThreadExecutor();


des = new DefaultExecutorService(cache, service, true); DistributedExecutorService des = new DefaultExecutorService(cache, service, true);


des.shutdown(); des.shutdown();


Expand Down Expand Up @@ -197,8 +189,6 @@ public void testDistributedExecutorShutDownNow() {


/** /**
* Tests that we can invoke DistributedExecutorService on an Infinispan cluster having a single node * Tests that we can invoke DistributedExecutorService on an Infinispan cluster having a single node
*
* @throws Exception
*/ */
public void testSingleCacheExecution() throws Exception { public void testSingleCacheExecution() throws Exception {
ConfigurationBuilder config = TestCacheManagerFactory.getDefaultCacheConfiguration(true); ConfigurationBuilder config = TestCacheManagerFactory.getDefaultCacheConfiguration(true);
Expand Down Expand Up @@ -226,8 +216,6 @@ public void testSingleCacheExecution() throws Exception {
/** /**
* Tests that we can invoke DistributedExecutorService task with keys * Tests that we can invoke DistributedExecutorService task with keys
* https://issues.jboss.org/browse/ISPN-1886 * https://issues.jboss.org/browse/ISPN-1886
*
* @throws Exception
*/ */
public void testSingleCacheWithKeysExecution() throws Exception { public void testSingleCacheWithKeysExecution() throws Exception {
ConfigurationBuilder config = TestCacheManagerFactory.getDefaultCacheConfiguration(true); ConfigurationBuilder config = TestCacheManagerFactory.getDefaultCacheConfiguration(true);
Expand All @@ -243,8 +231,7 @@ public void testSingleCacheWithKeysExecution() throws Exception {


des = new DefaultExecutorService(c1); des = new DefaultExecutorService(c1);


Future<Boolean> future = des.submit(new SimpleDistributedCallable(true), new String[] { Future<Boolean> future = des.submit(new SimpleDistributedCallable(true), "key1", "key2");
"key1", "key2" });
Boolean r = future.get(); Boolean r = future.get();
assert r; assert r;
} finally { } finally {
Expand Down Expand Up @@ -283,7 +270,7 @@ public int maxFailoverAttempts() {


DistributedTask<Integer> task = taskBuilder.build(); DistributedTask<Integer> task = taskBuilder.build();
AssertJUnit.assertEquals(1, task.getTaskFailoverPolicy().maxFailoverAttempts()); AssertJUnit.assertEquals(1, task.getTaskFailoverPolicy().maxFailoverAttempts());
Future<Integer> val = des.submit(task, new String[] { "key1" }); Future<Integer> val = des.submit(task, "key1");
AssertJUnit.assertEquals(new Integer(1), val.get()); AssertJUnit.assertEquals(new Integer(1), val.get());
} finally { } finally {
if (des != null) des.shutdownNow(); if (des != null) des.shutdownNow();
Expand Down Expand Up @@ -340,8 +327,8 @@ public int maxFailoverAttempts() {
}); });
DistributedTask<Boolean> task = taskBuilder.build(); DistributedTask<Boolean> task = taskBuilder.build();
AssertJUnit.assertEquals(1, task.getTaskFailoverPolicy().maxFailoverAttempts()); AssertJUnit.assertEquals(1, task.getTaskFailoverPolicy().maxFailoverAttempts());
Future<Boolean> val = des.submit(task, new String[] { "key1", "key5" }); Future<Boolean> val = des.submit(task, "key1", "key5");
AssertJUnit.assertEquals(new Boolean(true), val.get()); AssertJUnit.assertEquals(Boolean.TRUE, val.get());
} finally { } finally {
if (des != null) des.shutdownNow(); if (des != null) des.shutdownNow();
TestingUtil.killCacheManagers(cacheManager1); TestingUtil.killCacheManagers(cacheManager1);
Expand Down Expand Up @@ -369,12 +356,11 @@ public void testDistributedCallableEmptyFailoverPolicy() throws Exception {


Future<Integer> f = des.submit(task); Future<Integer> f = des.submit(task);


f.get(); expectException(ExecutionException.class, TestException.class, f::get);
} catch (ExecutionException e) {
// Verify that the distributed executor didn't wrap the exception in too many extra exceptions.
AssertJUnit.assertTrue("Wrong exception: " + e, e.getCause() instanceof ArithmeticException);
} finally { } finally {
if (des != null) des.shutdownNow(); if (des != null)
des.shutdownNow();

TestingUtil.killCacheManagers(cacheManager); TestingUtil.killCacheManagers(cacheManager);
} }
} }
Expand All @@ -401,7 +387,7 @@ public void testDistributedCallableRandomFailoverPolicy() throws Exception {


assert task.getTaskFailoverPolicy().equals(DefaultExecutorService.RANDOM_NODE_FAILOVER); assert task.getTaskFailoverPolicy().equals(DefaultExecutorService.RANDOM_NODE_FAILOVER);


Future<Integer> val = des.submit(task, new String[] {"key1"}); Future<Integer> val = des.submit(task, "key1");


val.get(); val.get();
throw new IllegalStateException("Should have raised exception"); throw new IllegalStateException("Should have raised exception");
Expand Down Expand Up @@ -441,7 +427,7 @@ public void testDistributedCallableRandomFailoverPolicyWith2Nodes() throws Excep


assert task.getTaskFailoverPolicy().equals(DefaultExecutorService.RANDOM_NODE_FAILOVER); assert task.getTaskFailoverPolicy().equals(DefaultExecutorService.RANDOM_NODE_FAILOVER);


Future<Integer> val = des.submit(task, new String[] {"key1"}); Future<Integer> val = des.submit(task, "key1");
val.get(); val.get();
throw new IllegalStateException("Should have thrown exception"); throw new IllegalStateException("Should have thrown exception");
} catch (Exception e){ } catch (Exception e){
Expand All @@ -451,7 +437,9 @@ public void testDistributedCallableRandomFailoverPolicyWith2Nodes() throws Excep
AssertJUnit.assertEquals(false, duplicateEEInChain); AssertJUnit.assertEquals(false, duplicateEEInChain);
} }
finally { finally {
if (des != null) des.shutdownNow(); if (des != null)
des.shutdownNow();

TestingUtil.killCacheManagers(cacheManager, cacheManager1); TestingUtil.killCacheManagers(cacheManager, cacheManager1);
} }
} }
Expand All @@ -461,23 +449,25 @@ public void testBasicTargetLocalDistributedCallableWithoutAnyTimeout() throws Ex
config.clustering().cacheMode(CacheMode.REPL_SYNC).remoteTimeout(0L); config.clustering().cacheMode(CacheMode.REPL_SYNC).remoteTimeout(0L);
EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createClusteredCacheManager(config); EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createClusteredCacheManager(config);
EmbeddedCacheManager cacheManager1 = TestCacheManagerFactory.createClusteredCacheManager(config); EmbeddedCacheManager cacheManager1 = TestCacheManagerFactory.createClusteredCacheManager(config);

Cache<Object, Object> cache1 = cacheManager.getCache();
Cache<Object, Object> cache2 = cacheManager1.getCache();
DistributedExecutorService des = null; DistributedExecutorService des = null;
try { try {
Cache<Object, Object> cache1 = cacheManager.getCache();
Cache<Object, Object> cache2 = cacheManager1.getCache();

// initiate task from cache1 and execute on same node // initiate task from cache1 and execute on same node
des = new DefaultExecutorService(cache1); des = new DefaultExecutorService(cache1);
Address target = cache1.getAdvancedCache().getRpcManager().getAddress(); Address target = cache1.getAdvancedCache().getRpcManager().getAddress();


DistributedTaskBuilder builder = des DistributedTaskBuilder<Integer> builder = des
.createDistributedTaskBuilder(new DistributedExecutorTest.SleepingSimpleCallable()); .createDistributedTaskBuilder(new DistributedExecutorTest.SleepingSimpleCallable());


Future<Integer> future = des.submit(target, builder.build()); Future<Integer> future = des.submit(target, builder.build());


AssertJUnit.assertEquals((Integer) 1, future.get()); AssertJUnit.assertEquals((Integer) 1, future.get());
} finally { } finally {
des.shutdown(); if (des != null)
des.shutdownNow();

TestingUtil.killCacheManagers(cacheManager, cacheManager1); TestingUtil.killCacheManagers(cacheManager, cacheManager1);
} }
} }
Expand All @@ -497,14 +487,16 @@ public void testBasicTargetRemoteDistributedCallableWithoutAnyTimeout() throws E
des = new DefaultExecutorService(cache1); des = new DefaultExecutorService(cache1);
Address target = cache2.getAdvancedCache().getRpcManager().getAddress(); Address target = cache2.getAdvancedCache().getRpcManager().getAddress();


DistributedTaskBuilder builder = des DistributedTaskBuilder<Integer> builder = des
.createDistributedTaskBuilder(new DistributedExecutorTest.SleepingSimpleCallable()); .createDistributedTaskBuilder(new DistributedExecutorTest.SleepingSimpleCallable());


Future<Integer> future = des.submit(target, builder.build()); Future<Integer> future = des.submit(target, builder.build());


AssertJUnit.assertEquals((Integer) 1, future.get()); AssertJUnit.assertEquals((Integer) 1, future.get());
} finally { } finally {
des.shutdown(); if (des != null)
des.shutdown();

TestingUtil.killCacheManagers(cacheManager, cacheManager1); TestingUtil.killCacheManagers(cacheManager, cacheManager1);
} }
} }
Expand Down Expand Up @@ -542,7 +534,7 @@ public int maxFailoverAttempts() {
DistributedTask<Integer> task = taskBuilder.build(); DistributedTask<Integer> task = taskBuilder.build();
assert task.getTaskFailoverPolicy().maxFailoverAttempts() == 0; assert task.getTaskFailoverPolicy().maxFailoverAttempts() == 0;


Future<Integer> val = des.submit(task, new String[] {"key1"}); Future<Integer> val = des.submit(task, "key1");
val.get(); val.get();
throw new IllegalStateException("Should have thrown exception"); throw new IllegalStateException("Should have thrown exception");
} catch (Exception e) { } catch (Exception e) {
Expand Down Expand Up @@ -592,9 +584,6 @@ static class SimpleCallable implements Callable<Integer>, Serializable, External
/** The serialVersionUID */ /** The serialVersionUID */
private static final long serialVersionUID = -8589149500259272402L; private static final long serialVersionUID = -8589149500259272402L;


public SimpleCallable() {
}

@Override @Override
public Integer call() throws Exception { public Integer call() throws Exception {
return 1; return 1;
Expand All @@ -607,17 +596,13 @@ static class FailOnlyOnceCallable implements Callable<Integer>, Serializable, Ex
private static final long serialVersionUID = 3961940091247573385L; private static final long serialVersionUID = 3961940091247573385L;
boolean throwException = true; boolean throwException = true;


public FailOnlyOnceCallable() {
super();
}

@Override @Override
public Integer call() throws Exception { public Integer call() throws Exception {
if (throwException) { if (throwException) {
// do to not throw the exception 2nd time during retry. // do to not throw the exception 2nd time during retry.
throwException = false; throwException = false;
// now throw exception for the first run // now throw exception for the first run
int a = 5 / 0; throw new TestException();
} }
return 1; return 1;
} }
Expand All @@ -638,7 +623,7 @@ public Boolean call() throws Exception {
if(throwException) { if(throwException) {
throwException = false; throwException = false;


int a = 5 / 0; throw new TestException();
} }


return true; return true;
Expand All @@ -650,15 +635,9 @@ static class ExceptionThrowingCallable implements Callable<Integer>, Serializabl
/** The serialVersionUID */ /** The serialVersionUID */
private static final long serialVersionUID = -8589149500259272402L; private static final long serialVersionUID = -8589149500259272402L;


public ExceptionThrowingCallable() {
}

@Override @Override
public Integer call() throws Exception { public Integer call() throws Exception {
//simulating ArithmeticException throw new TestException();
int a = 5 / 0;

return 1;
} }
} }
} }

0 comments on commit ca9045d

Please sign in to comment.