diff --git a/src/test/java/org/jboss/threads/EnhancedThreadQueueExecutorTestCase.java b/src/test/java/org/jboss/threads/EnhancedThreadQueueExecutorTestCase.java
index 39299ce9..47c2acfe 100644
--- a/src/test/java/org/jboss/threads/EnhancedThreadQueueExecutorTestCase.java
+++ b/src/test/java/org/jboss/threads/EnhancedThreadQueueExecutorTestCase.java
@@ -15,16 +15,334 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.jboss.threads;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.junit.Test;
+/**
+ * Tests for checking EnhancedTreadPoolExecutor
+ *
+ */
public class EnhancedThreadQueueExecutorTestCase {
+ private int coreSize = 4;
+ private int maxSize = 7;
+ private long keepaliveTimeMillis = 1000;
+ private long defaultWaitTimeout = 5000;
+
+ class TestTask implements Runnable {
+ CountDownLatch exitLatch;
+ CountDownLatch allThreadsRunningLatch;
+
+ private TestTask(CountDownLatch exitLatch, CountDownLatch allThreadsRunningLatch) {
+ this.exitLatch = exitLatch;
+ this.allThreadsRunningLatch = allThreadsRunningLatch;
+ }
+
+ @Override
+ public void run() {
+ try {
+ allThreadsRunningLatch.countDown();
+ exitLatch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * Test invalid values:
+ * * Negative keepAlive, coreSize, maxSize
+ * * maxSize > coreSize
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidValuesKeepAliveZero() {
+ new EnhancedQueueExecutor.Builder()
+ .setKeepAliveTime(0, TimeUnit.MILLISECONDS)
+ .setCorePoolSize(coreSize)
+ .setMaximumPoolSize(maxSize)
+ .build();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidValuesKeepAliveNegative() {
+ new EnhancedQueueExecutor.Builder()
+ .setKeepAliveTime(-3456, TimeUnit.MILLISECONDS)
+ .setCorePoolSize(coreSize)
+ .setMaximumPoolSize(maxSize)
+ .build();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidValuesCoreSizeNegative() {
+ new EnhancedQueueExecutor.Builder()
+ .setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS)
+ .setCorePoolSize(-5)
+ .setMaximumPoolSize(maxSize)
+ .build();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidValuesMaxSizeNegative() {
+ new EnhancedQueueExecutor.Builder()
+ .setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS)
+ .setCorePoolSize(coreSize)
+ .setMaximumPoolSize(-3)
+ .build();
+ }
+
+ @Test
+ public void testCoreSizeBiggerThanMaxSize() {
+ int expectedCorePoolSize = 5;
+ EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder())
+ .setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS)
+ .setCorePoolSize(2 * expectedCorePoolSize)
+ .setMaximumPoolSize(expectedCorePoolSize)
+ .build();
+ Assert.assertEquals("Core size should be automatically adjusted to be equal to max size in case it's bigger.", expectedCorePoolSize, executor.getCorePoolSize());
+ }
+
+ /**
+ * Test that unused threads are being reused. Scenario:
+ *
+ * - max threads = 2x, core threads = x
+ * - schedule x tasks, wait for tasks to finish
+ * - schedule x tasks, expect pool size = x immediately after
+ *
+ */
+ @Test
+ public void testThreadReuse() throws TimeoutException, InterruptedException {
+ EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder())
+ .setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS)
+ .setCorePoolSize(coreSize)
+ .setMaximumPoolSize(maxSize)
+ .build();
+
+ CountDownLatch exitLatch = new CountDownLatch(1);
+ CountDownLatch allThreadsRunningLatch = new CountDownLatch(coreSize);
+
+ for (int i = 0; i < coreSize; i++) {
+ executor.execute(new TestTask(exitLatch, allThreadsRunningLatch));
+ }
+ Assert.assertTrue("Not all threads were running. They were most likely not scheduled for execution.",
+ allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS));
+ waitForPoolSize(executor, coreSize, defaultWaitTimeout);
+ exitLatch.countDown();
+ waitForActiveCount(executor, 0, defaultWaitTimeout);
+ waitForPoolSize(executor, coreSize, defaultWaitTimeout);
+ exitLatch = new CountDownLatch(1);
+ allThreadsRunningLatch = new CountDownLatch(coreSize);
+ for (int i = 0; i < coreSize; i++) {
+ executor.execute(new TestTask(exitLatch, allThreadsRunningLatch));
+ }
+ Assert.assertTrue("Not all threads were running. They were most likely not scheduled for execution.",
+ allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS));
+ exitLatch.countDown();
+ waitForPoolSize(executor, coreSize, defaultWaitTimeout);
+ executor.shutdown();
+ }
+
+ /**
+ * Test thread reuse above core size
+ * Scenario:
+ *
+ * - setKeepAlive=60 sec
+ * - max threads = 2x, core threads = x
+ * - schedule x tasks and wait to occupy all core threads
+ * - schedule one more task and let it finish
+ * - schedule one task and check that pool size is still x+1
+ *
+ */
+ @Test
+ public void testThreadReuseAboveCoreSize() throws TimeoutException, InterruptedException {
+ EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder())
+ .setKeepAliveTime(60000, TimeUnit.MILLISECONDS)
+ .setCorePoolSize(coreSize)
+ .setMaximumPoolSize(maxSize)
+ .build();
+
+ // submit 3 tasks to fill core size
+ CountDownLatch exitLatch = new CountDownLatch(1);
+ CountDownLatch allThreadsRunningLatch = new CountDownLatch(coreSize);
+ for (int i = 0; i < coreSize; i++) {
+ executor.execute(new TestTask(exitLatch, allThreadsRunningLatch));
+ }
+ Assert.assertTrue("Not all threads were running. They were most likely not scheduled for execution.",
+ allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS));
+ waitForPoolSize(executor, coreSize, defaultWaitTimeout);
+
+ // submit one more task and allow it to finish
+ CountDownLatch singleExitLatch = new CountDownLatch(1);
+ CountDownLatch threadRunningLatch = new CountDownLatch(1);
+ executor.execute(new TestTask(singleExitLatch, threadRunningLatch));
+ threadRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS);
+ waitForPoolSize(executor, coreSize + 1, defaultWaitTimeout);
+ singleExitLatch.countDown();
+ waitForActiveCount(executor, coreSize, defaultWaitTimeout);
+
+ // now there are just core threads and one free thread, submit another task and check it's reused
+ singleExitLatch = new CountDownLatch(1);
+ threadRunningLatch = new CountDownLatch(1);
+ executor.execute(new TestTask(singleExitLatch, threadRunningLatch));
+ threadRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS);
+ waitForPoolSize(executor, coreSize + 1, defaultWaitTimeout);
+ singleExitLatch.countDown();
+
+ // finish all
+ exitLatch.countDown();
+ executor.shutdown();
+ }
+
+ /**
+ * Test that keepalive time is honored and threads above the core count are being removed when no tasks are
+ * available.
+ *
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void testKeepaliveTime() throws TimeoutException, InterruptedException {
+ EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder())
+ .setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS)
+ .setCorePoolSize(coreSize)
+ .setMaximumPoolSize(maxSize)
+ .allowCoreThreadTimeOut(false)
+ .build();
+
+ CountDownLatch exitLatch = new CountDownLatch(1);
+ CountDownLatch allThreadsRunningLatch = new CountDownLatch(coreSize);
+ for (int i = 0; i < coreSize; i++) {
+ executor.execute(new TestTask(exitLatch, allThreadsRunningLatch));
+ }
+ Assert.assertTrue("Not all core threads are running. They were most likely not scheduled for execution.",
+ allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS));
+ CountDownLatch exitLatch2 = new CountDownLatch(1);
+ CountDownLatch allThreadsRunningLatch2 = new CountDownLatch(maxSize - coreSize);
+ for (int i = 0; i < (maxSize - coreSize); i++) {
+ executor.execute(new TestTask(exitLatch2, allThreadsRunningLatch2));
+ }
+ Assert.assertTrue("Not all above core threads were running. They were most likely not scheduled for execution.",
+ allThreadsRunningLatch2.await(defaultWaitTimeout, TimeUnit.MILLISECONDS));
+ waitForPoolSize(executor, maxSize, defaultWaitTimeout);
+
+ // finish core tasks and let timeout "core" threads
+ exitLatch.countDown();
+ waitForActiveCount(executor, maxSize - coreSize, defaultWaitTimeout);
+ waitForPoolSize(executor, Math.max(coreSize, (maxSize - coreSize)), defaultWaitTimeout);
+ exitLatch2.countDown();
+ waitForActiveCount(executor, 0, defaultWaitTimeout);
+ waitForPoolSize(executor, coreSize, defaultWaitTimeout);
+ executor.shutdown();
+ }
+
+ /**
+ * Test that keepalive time is ignored when core threads are the same as max
+ * threads and core thread time out is disabled.
+ */
+ @Test
+ public void testKeepaliveTime2() throws TimeoutException, InterruptedException {
+ EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder())
+ .setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS)
+ .setCorePoolSize(coreSize)
+ .setMaximumPoolSize(coreSize)
+ .build();
+
+ CountDownLatch exitLatch = new CountDownLatch(1);
+ CountDownLatch allThreadsRunningLatch = new CountDownLatch(coreSize);
+
+ for (int i = 0; i < coreSize; i++) {
+ executor.execute(new TestTask(exitLatch, allThreadsRunningLatch));
+ }
+ Assert.assertTrue("Not all threads were running. They were most likely not scheduled for execution.",
+ allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS));
+ waitForPoolSize(executor, coreSize, defaultWaitTimeout);
+ exitLatch.countDown();
+ waitForActiveCount(executor, 0, defaultWaitTimeout);
+ waitForPoolSize(executor, coreSize, defaultWaitTimeout);
+ executor.shutdown();
+ }
+
+ /**
+ * Test the keepalive setting with core thread time out enabled.
+ */
+ @Test
+ public void testKeepaliveTimeWithCoreThreadTimeoutEnabled() throws TimeoutException, InterruptedException {
+ EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder())
+ .setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS)
+ .allowCoreThreadTimeOut(true)
+ .setCorePoolSize(coreSize)
+ .setMaximumPoolSize(maxSize)
+ .build();
+
+ CountDownLatch exitLatch = new CountDownLatch(1);
+ CountDownLatch allThreadsRunningLatch = new CountDownLatch(maxSize);
+
+ for (int i = 0; i < maxSize; i++) {
+ executor.execute(new TestTask(exitLatch, allThreadsRunningLatch));
+ }
+ // this will make sure that all thread are running at the same time
+ Assert.assertTrue("Not all threads were running. They were most likely not scheduled for execution.",
+ allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS));
+ exitLatch.countDown();
+ waitForActiveCount(executor, 0, defaultWaitTimeout);
+ waitForPoolSize(executor, 0, defaultWaitTimeout);
+ executor.shutdown();
+ }
+
+ /**
+ * Tests that prestarting core threads starts exactly the core threads amount specified.
+ */
+ @Test
+ public void testPrestartCoreThreads() {
+ EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder())
+ .setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS)
+ .setCorePoolSize(coreSize)
+ .setMaximumPoolSize(maxSize)
+ .build();
+ int prestarted = executor.prestartAllCoreThreads();
+ Assert.assertEquals("expected: == " + coreSize + ", actual: " + prestarted, coreSize, prestarted);
+ Assert.assertEquals("expected: == " + coreSize + ", actual: " + executor.getPoolSize(), coreSize, executor.getPoolSize());
+ executor.shutdown();
+ }
+
+ private void waitForPoolSize(EnhancedQueueExecutor executor, int expectedPoolSize, long waitMillis) throws TimeoutException, InterruptedException {
+ long deadline = System.currentTimeMillis() + waitMillis;
+ long delayMillis = 100;
+
+ do {
+ if (executor.getPoolSize() == expectedPoolSize) {
+ break;
+ }
+ Thread.sleep(delayMillis);
+ } while (System.currentTimeMillis() + delayMillis < deadline);
+ if (executor.getPoolSize() != expectedPoolSize) {
+ throw new TimeoutException("Timed out waiting for pool size to become " + expectedPoolSize
+ + ", current pool size is " + executor.getPoolSize());
+ }
+ }
+
+ private void waitForActiveCount(EnhancedQueueExecutor executor, int expectedActiveCount, long waitMillis) throws TimeoutException, InterruptedException {
+ long deadline = System.currentTimeMillis() + waitMillis;
+ long delayMillis = 100;
+
+ do {
+ if (executor.getActiveCount() == expectedActiveCount) {
+ break;
+ }
+ Thread.sleep(delayMillis);
+ } while (System.currentTimeMillis() + delayMillis < deadline);
+ if (executor.getActiveCount() != expectedActiveCount) {
+ throw new TimeoutException("Timed out waiting for active count to become " + expectedActiveCount
+ + ", current active count is " + executor.getActiveCount());
+ }
+ }
+
@Test
public void testEnhancedExecutorShutdownNoTasks() throws Exception {
final CountDownLatch terminateLatch = new CountDownLatch(1);
@@ -57,7 +375,7 @@ public void run() {
})
.build();
- for(int i = 0; i < 10000; ++i) {
+ for (int i = 0; i < 10000; ++i) {
executor.submit(new Runnable() {
@Override
public void run() {