Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,334 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jboss.threads;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.Assert;
import org.junit.Test;

/**
* Tests for checking EnhancedTreadPoolExecutor
* <p>
*/
public class EnhancedThreadQueueExecutorTestCase {

private int coreSize = 4;
private int maxSize = 7;
private long keepaliveTimeMillis = 1000;
private long defaultWaitTimeout = 5000;

class TestTask implements Runnable {
CountDownLatch exitLatch;
CountDownLatch allThreadsRunningLatch;

private TestTask(CountDownLatch exitLatch, CountDownLatch allThreadsRunningLatch) {
this.exitLatch = exitLatch;
this.allThreadsRunningLatch = allThreadsRunningLatch;
}

@Override
public void run() {
try {
allThreadsRunningLatch.countDown();
exitLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* Test invalid values:
* * Negative keepAlive, coreSize, maxSize
* * maxSize > coreSize
*/
@Test(expected = IllegalArgumentException.class)
public void testInvalidValuesKeepAliveZero() {
new EnhancedQueueExecutor.Builder()
.setKeepAliveTime(0, TimeUnit.MILLISECONDS)
.setCorePoolSize(coreSize)
.setMaximumPoolSize(maxSize)
.build();
}

@Test(expected = IllegalArgumentException.class)
public void testInvalidValuesKeepAliveNegative() {
new EnhancedQueueExecutor.Builder()
.setKeepAliveTime(-3456, TimeUnit.MILLISECONDS)
.setCorePoolSize(coreSize)
.setMaximumPoolSize(maxSize)
.build();
}

@Test(expected = IllegalArgumentException.class)
public void testInvalidValuesCoreSizeNegative() {
new EnhancedQueueExecutor.Builder()
.setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS)
.setCorePoolSize(-5)
.setMaximumPoolSize(maxSize)
.build();
}

@Test(expected = IllegalArgumentException.class)
public void testInvalidValuesMaxSizeNegative() {
new EnhancedQueueExecutor.Builder()
.setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS)
.setCorePoolSize(coreSize)
.setMaximumPoolSize(-3)
.build();
}

@Test
public void testCoreSizeBiggerThanMaxSize() {
int expectedCorePoolSize = 5;
EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder())
.setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS)
.setCorePoolSize(2 * expectedCorePoolSize)
.setMaximumPoolSize(expectedCorePoolSize)
.build();
Assert.assertEquals("Core size should be automatically adjusted to be equal to max size in case it's bigger.", expectedCorePoolSize, executor.getCorePoolSize());
}

/**
* Test that unused threads are being reused. Scenario:
* <ul>
* <li>max threads = 2x, core threads = x</li>
* <li>schedule x tasks, wait for tasks to finish</li>
* <li>schedule x tasks, expect pool size = x immediately after</li>
* </ul>
*/
@Test
public void testThreadReuse() throws TimeoutException, InterruptedException {
EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder())
.setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS)
.setCorePoolSize(coreSize)
.setMaximumPoolSize(maxSize)
.build();

CountDownLatch exitLatch = new CountDownLatch(1);
CountDownLatch allThreadsRunningLatch = new CountDownLatch(coreSize);

for (int i = 0; i < coreSize; i++) {
executor.execute(new TestTask(exitLatch, allThreadsRunningLatch));
}
Assert.assertTrue("Not all threads were running. They were most likely not scheduled for execution.",
allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS));
waitForPoolSize(executor, coreSize, defaultWaitTimeout);
exitLatch.countDown();
waitForActiveCount(executor, 0, defaultWaitTimeout);
waitForPoolSize(executor, coreSize, defaultWaitTimeout);
exitLatch = new CountDownLatch(1);
allThreadsRunningLatch = new CountDownLatch(coreSize);
for (int i = 0; i < coreSize; i++) {
executor.execute(new TestTask(exitLatch, allThreadsRunningLatch));
}
Assert.assertTrue("Not all threads were running. They were most likely not scheduled for execution.",
allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS));
exitLatch.countDown();
waitForPoolSize(executor, coreSize, defaultWaitTimeout);
executor.shutdown();
}

/**
* Test thread reuse above core size
* Scenario:
* <ul>
* <li>setKeepAlive=60 sec</li>
* <li>max threads = 2x, core threads = x</li>
* <li>schedule x tasks and wait to occupy all core threads </li>
* <li>schedule one more task and let it finish</li>
* <li>schedule one task and check that pool size is still x+1</li>
* </ul>
*/
@Test
public void testThreadReuseAboveCoreSize() throws TimeoutException, InterruptedException {
EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder())
.setKeepAliveTime(60000, TimeUnit.MILLISECONDS)
.setCorePoolSize(coreSize)
.setMaximumPoolSize(maxSize)
.build();

// submit 3 tasks to fill core size
CountDownLatch exitLatch = new CountDownLatch(1);
CountDownLatch allThreadsRunningLatch = new CountDownLatch(coreSize);
for (int i = 0; i < coreSize; i++) {
executor.execute(new TestTask(exitLatch, allThreadsRunningLatch));
}
Assert.assertTrue("Not all threads were running. They were most likely not scheduled for execution.",
allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS));
waitForPoolSize(executor, coreSize, defaultWaitTimeout);

// submit one more task and allow it to finish
CountDownLatch singleExitLatch = new CountDownLatch(1);
CountDownLatch threadRunningLatch = new CountDownLatch(1);
executor.execute(new TestTask(singleExitLatch, threadRunningLatch));
threadRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS);
waitForPoolSize(executor, coreSize + 1, defaultWaitTimeout);
singleExitLatch.countDown();
waitForActiveCount(executor, coreSize, defaultWaitTimeout);

// now there are just core threads and one free thread, submit another task and check it's reused
singleExitLatch = new CountDownLatch(1);
threadRunningLatch = new CountDownLatch(1);
executor.execute(new TestTask(singleExitLatch, threadRunningLatch));
threadRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS);
waitForPoolSize(executor, coreSize + 1, defaultWaitTimeout);
singleExitLatch.countDown();

// finish all
exitLatch.countDown();
executor.shutdown();
}

/**
* Test that keepalive time is honored and threads above the core count are being removed when no tasks are
* available.
*
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testKeepaliveTime() throws TimeoutException, InterruptedException {
EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder())
.setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS)
.setCorePoolSize(coreSize)
.setMaximumPoolSize(maxSize)
.allowCoreThreadTimeOut(false)
.build();

CountDownLatch exitLatch = new CountDownLatch(1);
CountDownLatch allThreadsRunningLatch = new CountDownLatch(coreSize);
for (int i = 0; i < coreSize; i++) {
executor.execute(new TestTask(exitLatch, allThreadsRunningLatch));
}
Assert.assertTrue("Not all core threads are running. They were most likely not scheduled for execution.",
allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS));
CountDownLatch exitLatch2 = new CountDownLatch(1);
CountDownLatch allThreadsRunningLatch2 = new CountDownLatch(maxSize - coreSize);
for (int i = 0; i < (maxSize - coreSize); i++) {
executor.execute(new TestTask(exitLatch2, allThreadsRunningLatch2));
}
Assert.assertTrue("Not all above core threads were running. They were most likely not scheduled for execution.",
allThreadsRunningLatch2.await(defaultWaitTimeout, TimeUnit.MILLISECONDS));
waitForPoolSize(executor, maxSize, defaultWaitTimeout);

// finish core tasks and let timeout "core" threads
exitLatch.countDown();
waitForActiveCount(executor, maxSize - coreSize, defaultWaitTimeout);
waitForPoolSize(executor, Math.max(coreSize, (maxSize - coreSize)), defaultWaitTimeout);
exitLatch2.countDown();
waitForActiveCount(executor, 0, defaultWaitTimeout);
waitForPoolSize(executor, coreSize, defaultWaitTimeout);
executor.shutdown();
}

/**
* Test that keepalive time is ignored when core threads are the same as max
* threads and core thread time out is disabled.
*/
@Test
public void testKeepaliveTime2() throws TimeoutException, InterruptedException {
EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder())
.setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS)
.setCorePoolSize(coreSize)
.setMaximumPoolSize(coreSize)
.build();

CountDownLatch exitLatch = new CountDownLatch(1);
CountDownLatch allThreadsRunningLatch = new CountDownLatch(coreSize);

for (int i = 0; i < coreSize; i++) {
executor.execute(new TestTask(exitLatch, allThreadsRunningLatch));
}
Assert.assertTrue("Not all threads were running. They were most likely not scheduled for execution.",
allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS));
waitForPoolSize(executor, coreSize, defaultWaitTimeout);
exitLatch.countDown();
waitForActiveCount(executor, 0, defaultWaitTimeout);
waitForPoolSize(executor, coreSize, defaultWaitTimeout);
executor.shutdown();
}

/**
* Test the keepalive setting with core thread time out enabled.
*/
@Test
public void testKeepaliveTimeWithCoreThreadTimeoutEnabled() throws TimeoutException, InterruptedException {
EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder())
.setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS)
.allowCoreThreadTimeOut(true)
.setCorePoolSize(coreSize)
.setMaximumPoolSize(maxSize)
.build();

CountDownLatch exitLatch = new CountDownLatch(1);
CountDownLatch allThreadsRunningLatch = new CountDownLatch(maxSize);

for (int i = 0; i < maxSize; i++) {
executor.execute(new TestTask(exitLatch, allThreadsRunningLatch));
}
// this will make sure that all thread are running at the same time
Assert.assertTrue("Not all threads were running. They were most likely not scheduled for execution.",
allThreadsRunningLatch.await(defaultWaitTimeout, TimeUnit.MILLISECONDS));
exitLatch.countDown();
waitForActiveCount(executor, 0, defaultWaitTimeout);
waitForPoolSize(executor, 0, defaultWaitTimeout);
executor.shutdown();
}

/**
* Tests that prestarting core threads starts exactly the core threads amount specified.
*/
@Test
public void testPrestartCoreThreads() {
EnhancedQueueExecutor executor = (new EnhancedQueueExecutor.Builder())
.setKeepAliveTime(keepaliveTimeMillis, TimeUnit.MILLISECONDS)
.setCorePoolSize(coreSize)
.setMaximumPoolSize(maxSize)
.build();
int prestarted = executor.prestartAllCoreThreads();
Assert.assertEquals("expected: == " + coreSize + ", actual: " + prestarted, coreSize, prestarted);
Assert.assertEquals("expected: == " + coreSize + ", actual: " + executor.getPoolSize(), coreSize, executor.getPoolSize());
executor.shutdown();
}

private void waitForPoolSize(EnhancedQueueExecutor executor, int expectedPoolSize, long waitMillis) throws TimeoutException, InterruptedException {
long deadline = System.currentTimeMillis() + waitMillis;
long delayMillis = 100;

do {
if (executor.getPoolSize() == expectedPoolSize) {
break;
}
Thread.sleep(delayMillis);
} while (System.currentTimeMillis() + delayMillis < deadline);
if (executor.getPoolSize() != expectedPoolSize) {
throw new TimeoutException("Timed out waiting for pool size to become " + expectedPoolSize
+ ", current pool size is " + executor.getPoolSize());
}
}

private void waitForActiveCount(EnhancedQueueExecutor executor, int expectedActiveCount, long waitMillis) throws TimeoutException, InterruptedException {
long deadline = System.currentTimeMillis() + waitMillis;
long delayMillis = 100;

do {
if (executor.getActiveCount() == expectedActiveCount) {
break;
}
Thread.sleep(delayMillis);
} while (System.currentTimeMillis() + delayMillis < deadline);
if (executor.getActiveCount() != expectedActiveCount) {
throw new TimeoutException("Timed out waiting for active count to become " + expectedActiveCount
+ ", current active count is " + executor.getActiveCount());
}
}

@Test
public void testEnhancedExecutorShutdownNoTasks() throws Exception {
final CountDownLatch terminateLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -57,7 +375,7 @@ public void run() {
})
.build();

for(int i = 0; i < 10000; ++i) {
for (int i = 0; i < 10000; ++i) {
executor.submit(new Runnable() {
@Override
public void run() {
Expand Down