Skip to content

Commit

Permalink
ISPN-7141 LimitedExecutorTest.testConcurrencyLimit random failures
Browse files Browse the repository at this point in the history
* Increase the thread pool size to 2 for testConcurrencyLimit
* Wait for the executor to settle before/after each test
  • Loading branch information
danberindei committed Nov 11, 2016
1 parent cc4aae7 commit 946f3fa
Showing 1 changed file with 49 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package org.infinispan.executors;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertFalse;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestException;
import org.infinispan.util.concurrent.WithinThreadExecutor;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
Expand All @@ -21,12 +18,13 @@
* Basic tests for {@link LimitedExecutor}
*
* @author Dan Berindei
* @since 9.0
*/
@Test(groups = "functional", testName = "executors.LimitedExecutorTest")
public class LimitedExecutorTest extends AbstractInfinispanTest {
public static final String NAME = "Test";
private final ExecutorService executor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2,
0L, MILLISECONDS,
new SynchronousQueue<>(),
getTestThreadFactory(NAME));

Expand All @@ -43,51 +41,72 @@ public void testConcurrency1WithinThread() throws Exception {
assertEquals("bla", cf.get());
}

/**
* Test that no more than 1 task runs at a time.
*/
public void testConcurrencyLimit() throws Exception {
eventuallyEquals(0, executor::getActiveCount);
LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, executor, 1);
CountDownLatch latch = new CountDownLatch(1);
CompletableFuture<String> blocker1 = new CompletableFuture<>();
CompletableFuture<String> blocker2 = new CompletableFuture<>();

CompletableFuture<String> cf1 = new CompletableFuture<>();
limitedExecutor.execute(() -> {
awaitLatch(latch);
cf1.complete("bla");
try {
cf1.complete(blocker1.get(10, SECONDS));
} catch (Exception e) {
cf1.completeExceptionally(e);
}
});

CompletableFuture<String> cf2 = new CompletableFuture<>();
limitedExecutor.execute(() -> cf2.complete("bla"));
limitedExecutor.execute(() -> {
try {
cf2.complete(cf1.getNow("task 2 ran too early") + " " + blocker2.get(10, SECONDS));
} catch (Exception e) {
cf2.completeExceptionally(e);
}
});

Thread.sleep(10);
assertFalse(cf1.isDone());
assertFalse(cf2.isDone());

latch.countDown();
assertEquals("bla", cf1.get(10, SECONDS));
assertEquals("bla", cf2.get(10, SECONDS));
}

private void awaitLatch(CountDownLatch latch1) {
try {
latch1.await(30, SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TestException(e);
}
blocker1.complete("value1");
blocker2.complete("value2");
assertEquals("value1", cf1.get(10, SECONDS));
assertEquals("value1 value2", cf2.get(10, SECONDS));
eventuallyEquals(0, executor::getActiveCount);
}

/**
* Test that an async task ({@code executeAsync()}) will block another task from running
* until its {@code CompletableFuture} is completed.
*/
public void testAsyncTasks() throws Exception {
eventuallyEquals(0, executor::getActiveCount);
LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, executor, 1);
CompletableFuture<String> blocker1 = new CompletableFuture<>();
CompletableFuture<String> blocker2 = new CompletableFuture<>();

CompletableFuture<Void> blockingFuture = new CompletableFuture<>();
CompletableFuture<String> cf1 = new CompletableFuture<>();
limitedExecutor.executeAsync(() -> blockingFuture.thenRunAsync(() -> cf1.complete("bla")));
limitedExecutor.executeAsync(() -> blocker1.thenAccept(cf1::complete));

CompletableFuture<String> cf2 = new CompletableFuture<>();
limitedExecutor.execute(() -> cf2.complete("bla"));
limitedExecutor.execute(() -> {
try {
cf2.complete(cf1.getNow("task 2 ran too early") + " " + blocker2.get(10, SECONDS));
} catch (Exception e) {
cf2.completeExceptionally(e);
}
});

Thread.sleep(10);
assertFalse(cf1.isDone());
assertFalse(cf2.isDone());

blockingFuture.complete(null);
assertEquals("bla", cf1.get(10, SECONDS));
assertEquals("bla", cf2.get(10, SECONDS));
blocker1.complete("value1");
blocker2.complete("value2");
assertEquals("value1", cf1.get(10, SECONDS));
assertEquals("value1 value2", cf2.get(10, SECONDS));
eventuallyEquals(0, executor::getActiveCount);
}
}

0 comments on commit 946f3fa

Please sign in to comment.