From d26c3fe404f18c7d1355cc6b0bb7e4f4725c5ca0 Mon Sep 17 00:00:00 2001 From: mnovak Date: Tue, 13 Aug 2019 08:53:06 +0200 Subject: [PATCH] [JBTHR-77] Add new tests for testing EnhacedThreadPoolExecutor --- .../EnhancedThreadQueueExecutorTestCase.java | 320 +++++++++++++++++- 1 file changed, 319 insertions(+), 1 deletion(-) diff --git a/src/test/java/org/jboss/threads/EnhancedThreadQueueExecutorTestCase.java b/src/test/java/org/jboss/threads/EnhancedThreadQueueExecutorTestCase.java index 39299ce9..47c2acfe 100644 --- a/src/test/java/org/jboss/threads/EnhancedThreadQueueExecutorTestCase.java +++ b/src/test/java/org/jboss/threads/EnhancedThreadQueueExecutorTestCase.java @@ -15,16 +15,334 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.jboss.threads; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.Assert; import org.junit.Test; +/** + * Tests for checking EnhancedTreadPoolExecutor + *

+ */ public class EnhancedThreadQueueExecutorTestCase { + private int coreSize = 4; + private int maxSize = 7; + private long keepaliveTimeMillis = 1000; + private long defaultWaitTimeout = 5000; + + class TestTask implements Runnable { + CountDownLatch exitLatch; + CountDownLatch allThreadsRunningLatch; + + private TestTask(CountDownLatch exitLatch, CountDownLatch allThreadsRunningLatch) { + this.exitLatch = exitLatch; + this.allThreadsRunningLatch = allThreadsRunningLatch; + } + + @Override + public void run() { + try { + allThreadsRunningLatch.countDown(); + exitLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + /** + * Test invalid values: + * * Negative keepAlive, coreSize, maxSize + * * maxSize > coreSize + */ + @Test(expected = IllegalArgumentException.class) + public void testInvalidValuesKeepAliveZero() { + new EnhancedQueueExecutor.Builder() + .setKeepAliveTime(0, TimeUnit.MILLISECONDS) + .setCorePoolSize(coreSize) + .setMaximumPoolSize(maxSize) + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidValuesKeepAliveNegative() { + new EnhancedQueueExecutor.Builder() + .setKeepAliveTime(-3456, TimeUnit.MILLISECONDS) + .setCorePoolSize(coreSize) + .setMaximumPoolSize(maxSize) + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidValuesCoreSizeNegative() { + new EnhancedQueueExecutor.Builder() + .setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS) + .setCorePoolSize(-5) + .setMaximumPoolSize(maxSize) + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidValuesMaxSizeNegative() { + new EnhancedQueueExecutor.Builder() + .setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS) + .setCorePoolSize(coreSize) + .setMaximumPoolSize(-3) + .build(); + } + + @Test + public void testCoreSizeBiggerThanMaxSize() { + int expectedCorePoolSize = 5; + EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder()) + .setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS) + .setCorePoolSize(2 * expectedCorePoolSize) + .setMaximumPoolSize(expectedCorePoolSize) + .build(); + Assert.assertEquals("Core size should be automatically adjusted to be equal to max size in case it's bigger.", expectedCorePoolSize, executor.getCorePoolSize()); + } + + /** + * Test that unused threads are being reused. Scenario: + *

+ */ + @Test + public void testThreadReuse() throws TimeoutException, InterruptedException { + EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder()) + .setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS) + .setCorePoolSize(coreSize) + .setMaximumPoolSize(maxSize) + .build(); + + CountDownLatch exitLatch = new CountDownLatch(1); + CountDownLatch allThreadsRunningLatch = new CountDownLatch(coreSize); + + for (int i = 0; i < coreSize; i++) { + executor.execute(new TestTask(exitLatch, allThreadsRunningLatch)); + } + Assert.assertTrue("Not all threads were running. They were most likely not scheduled for execution.", + allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS)); + waitForPoolSize(executor, coreSize, defaultWaitTimeout); + exitLatch.countDown(); + waitForActiveCount(executor, 0, defaultWaitTimeout); + waitForPoolSize(executor, coreSize, defaultWaitTimeout); + exitLatch = new CountDownLatch(1); + allThreadsRunningLatch = new CountDownLatch(coreSize); + for (int i = 0; i < coreSize; i++) { + executor.execute(new TestTask(exitLatch, allThreadsRunningLatch)); + } + Assert.assertTrue("Not all threads were running. They were most likely not scheduled for execution.", + allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS)); + exitLatch.countDown(); + waitForPoolSize(executor, coreSize, defaultWaitTimeout); + executor.shutdown(); + } + + /** + * Test thread reuse above core size + * Scenario: + * + */ + @Test + public void testThreadReuseAboveCoreSize() throws TimeoutException, InterruptedException { + EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder()) + .setKeepAliveTime(60000, TimeUnit.MILLISECONDS) + .setCorePoolSize(coreSize) + .setMaximumPoolSize(maxSize) + .build(); + + // submit 3 tasks to fill core size + CountDownLatch exitLatch = new CountDownLatch(1); + CountDownLatch allThreadsRunningLatch = new CountDownLatch(coreSize); + for (int i = 0; i < coreSize; i++) { + executor.execute(new TestTask(exitLatch, allThreadsRunningLatch)); + } + Assert.assertTrue("Not all threads were running. They were most likely not scheduled for execution.", + allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS)); + waitForPoolSize(executor, coreSize, defaultWaitTimeout); + + // submit one more task and allow it to finish + CountDownLatch singleExitLatch = new CountDownLatch(1); + CountDownLatch threadRunningLatch = new CountDownLatch(1); + executor.execute(new TestTask(singleExitLatch, threadRunningLatch)); + threadRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS); + waitForPoolSize(executor, coreSize + 1, defaultWaitTimeout); + singleExitLatch.countDown(); + waitForActiveCount(executor, coreSize, defaultWaitTimeout); + + // now there are just core threads and one free thread, submit another task and check it's reused + singleExitLatch = new CountDownLatch(1); + threadRunningLatch = new CountDownLatch(1); + executor.execute(new TestTask(singleExitLatch, threadRunningLatch)); + threadRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS); + waitForPoolSize(executor, coreSize + 1, defaultWaitTimeout); + singleExitLatch.countDown(); + + // finish all + exitLatch.countDown(); + executor.shutdown(); + } + + /** + * Test that keepalive time is honored and threads above the core count are being removed when no tasks are + * available. + * + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testKeepaliveTime() throws TimeoutException, InterruptedException { + EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder()) + .setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS) + .setCorePoolSize(coreSize) + .setMaximumPoolSize(maxSize) + .allowCoreThreadTimeOut(false) + .build(); + + CountDownLatch exitLatch = new CountDownLatch(1); + CountDownLatch allThreadsRunningLatch = new CountDownLatch(coreSize); + for (int i = 0; i < coreSize; i++) { + executor.execute(new TestTask(exitLatch, allThreadsRunningLatch)); + } + Assert.assertTrue("Not all core threads are running. They were most likely not scheduled for execution.", + allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS)); + CountDownLatch exitLatch2 = new CountDownLatch(1); + CountDownLatch allThreadsRunningLatch2 = new CountDownLatch(maxSize - coreSize); + for (int i = 0; i < (maxSize - coreSize); i++) { + executor.execute(new TestTask(exitLatch2, allThreadsRunningLatch2)); + } + Assert.assertTrue("Not all above core threads were running. They were most likely not scheduled for execution.", + allThreadsRunningLatch2.await(defaultWaitTimeout, TimeUnit.MILLISECONDS)); + waitForPoolSize(executor, maxSize, defaultWaitTimeout); + + // finish core tasks and let timeout "core" threads + exitLatch.countDown(); + waitForActiveCount(executor, maxSize - coreSize, defaultWaitTimeout); + waitForPoolSize(executor, Math.max(coreSize, (maxSize - coreSize)), defaultWaitTimeout); + exitLatch2.countDown(); + waitForActiveCount(executor, 0, defaultWaitTimeout); + waitForPoolSize(executor, coreSize, defaultWaitTimeout); + executor.shutdown(); + } + + /** + * Test that keepalive time is ignored when core threads are the same as max + * threads and core thread time out is disabled. + */ + @Test + public void testKeepaliveTime2() throws TimeoutException, InterruptedException { + EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder()) + .setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS) + .setCorePoolSize(coreSize) + .setMaximumPoolSize(coreSize) + .build(); + + CountDownLatch exitLatch = new CountDownLatch(1); + CountDownLatch allThreadsRunningLatch = new CountDownLatch(coreSize); + + for (int i = 0; i < coreSize; i++) { + executor.execute(new TestTask(exitLatch, allThreadsRunningLatch)); + } + Assert.assertTrue("Not all threads were running. They were most likely not scheduled for execution.", + allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS)); + waitForPoolSize(executor, coreSize, defaultWaitTimeout); + exitLatch.countDown(); + waitForActiveCount(executor, 0, defaultWaitTimeout); + waitForPoolSize(executor, coreSize, defaultWaitTimeout); + executor.shutdown(); + } + + /** + * Test the keepalive setting with core thread time out enabled. + */ + @Test + public void testKeepaliveTimeWithCoreThreadTimeoutEnabled() throws TimeoutException, InterruptedException { + EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder()) + .setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS) + .allowCoreThreadTimeOut(true) + .setCorePoolSize(coreSize) + .setMaximumPoolSize(maxSize) + .build(); + + CountDownLatch exitLatch = new CountDownLatch(1); + CountDownLatch allThreadsRunningLatch = new CountDownLatch(maxSize); + + for (int i = 0; i < maxSize; i++) { + executor.execute(new TestTask(exitLatch, allThreadsRunningLatch)); + } + // this will make sure that all thread are running at the same time + Assert.assertTrue("Not all threads were running. They were most likely not scheduled for execution.", + allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS)); + exitLatch.countDown(); + waitForActiveCount(executor, 0, defaultWaitTimeout); + waitForPoolSize(executor, 0, defaultWaitTimeout); + executor.shutdown(); + } + + /** + * Tests that prestarting core threads starts exactly the core threads amount specified. + */ + @Test + public void testPrestartCoreThreads() { + EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder()) + .setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS) + .setCorePoolSize(coreSize) + .setMaximumPoolSize(maxSize) + .build(); + int prestarted = executor.prestartAllCoreThreads(); + Assert.assertEquals("expected: == " + coreSize + ", actual: " + prestarted, coreSize, prestarted); + Assert.assertEquals("expected: == " + coreSize + ", actual: " + executor.getPoolSize(), coreSize, executor.getPoolSize()); + executor.shutdown(); + } + + private void waitForPoolSize(EnhancedQueueExecutor executor, int expectedPoolSize, long waitMillis) throws TimeoutException, InterruptedException { + long deadline = System.currentTimeMillis() + waitMillis; + long delayMillis = 100; + + do { + if (executor.getPoolSize() == expectedPoolSize) { + break; + } + Thread.sleep(delayMillis); + } while (System.currentTimeMillis() + delayMillis < deadline); + if (executor.getPoolSize() != expectedPoolSize) { + throw new TimeoutException("Timed out waiting for pool size to become " + expectedPoolSize + + ", current pool size is " + executor.getPoolSize()); + } + } + + private void waitForActiveCount(EnhancedQueueExecutor executor, int expectedActiveCount, long waitMillis) throws TimeoutException, InterruptedException { + long deadline = System.currentTimeMillis() + waitMillis; + long delayMillis = 100; + + do { + if (executor.getActiveCount() == expectedActiveCount) { + break; + } + Thread.sleep(delayMillis); + } while (System.currentTimeMillis() + delayMillis < deadline); + if (executor.getActiveCount() != expectedActiveCount) { + throw new TimeoutException("Timed out waiting for active count to become " + expectedActiveCount + + ", current active count is " + executor.getActiveCount()); + } + } + @Test public void testEnhancedExecutorShutdownNoTasks() throws Exception { final CountDownLatch terminateLatch = new CountDownLatch(1); @@ -57,7 +375,7 @@ public void run() { }) .build(); - for(int i = 0; i < 10000; ++i) { + for (int i = 0; i < 10000; ++i) { executor.submit(new Runnable() { @Override public void run() {