diff --git a/src/main/java/org/threadly/concurrent/PriorityScheduledExecutor.java b/src/main/java/org/threadly/concurrent/PriorityScheduledExecutor.java index 543b20147..6cc3b732b 100644 --- a/src/main/java/org/threadly/concurrent/PriorityScheduledExecutor.java +++ b/src/main/java/org/threadly/concurrent/PriorityScheduledExecutor.java @@ -661,6 +661,10 @@ protected static boolean isContained(Runnable startRunnable, return false; } } + } else if (startRunnable instanceof RunnableContainerInterface) { + RunnableContainerInterface rci = (RunnableContainerInterface)startRunnable; + + return isContained(rci.getContainedRunnable(), compareTo); } else { return false; } diff --git a/src/main/java/org/threadly/concurrent/limiter/SimpleSchedulerLimiter.java b/src/main/java/org/threadly/concurrent/limiter/SchedulerLimiter.java similarity index 74% rename from src/main/java/org/threadly/concurrent/limiter/SimpleSchedulerLimiter.java rename to src/main/java/org/threadly/concurrent/limiter/SchedulerLimiter.java index 098f17b01..a01f22244 100644 --- a/src/main/java/org/threadly/concurrent/limiter/SimpleSchedulerLimiter.java +++ b/src/main/java/org/threadly/concurrent/limiter/SchedulerLimiter.java @@ -1,8 +1,13 @@ package org.threadly.concurrent.limiter; +import java.util.concurrent.Callable; + import org.threadly.concurrent.RunnableContainerInterface; import org.threadly.concurrent.SimpleSchedulerInterface; +import org.threadly.concurrent.SubmitterSchedulerInterface; import org.threadly.concurrent.VirtualRunnable; +import org.threadly.concurrent.future.ListenableFuture; +import org.threadly.concurrent.future.ListenableFutureTask; /** *

This class is designed to limit how much parallel execution happens @@ -21,8 +26,8 @@ * * @author jent - Mike Jensen */ -public class SimpleSchedulerLimiter extends ExecutorLimiter - implements SimpleSchedulerInterface { +public class SchedulerLimiter extends ExecutorLimiter + implements SubmitterSchedulerInterface { protected final SimpleSchedulerInterface scheduler; /** @@ -31,8 +36,8 @@ public class SimpleSchedulerLimiter extends ExecutorLimiter * @param scheduler {@link SimpleSchedulerInterface} implementation to submit task executions to. * @param maxConcurrency maximum qty of runnables to run in parallel */ - public SimpleSchedulerLimiter(SimpleSchedulerInterface scheduler, - int maxConcurrency) { + public SchedulerLimiter(SimpleSchedulerInterface scheduler, + int maxConcurrency) { this(scheduler, maxConcurrency, null); } @@ -43,8 +48,8 @@ public SimpleSchedulerLimiter(SimpleSchedulerInterface scheduler, * @param maxConcurrency maximum qty of runnables to run in parallel * @param subPoolName name to describe threads while tasks running in pool (null to not change thread names) */ - public SimpleSchedulerLimiter(SimpleSchedulerInterface scheduler, - int maxConcurrency, String subPoolName) { + public SchedulerLimiter(SimpleSchedulerInterface scheduler, + int maxConcurrency, String subPoolName) { super(scheduler, maxConcurrency, subPoolName); this.scheduler = scheduler; @@ -55,6 +60,45 @@ public boolean isShutdown() { return scheduler.isShutdown(); } + @Override + public ListenableFuture submitScheduled(Runnable task, long delayInMs) { + if (task == null) { + throw new IllegalArgumentException("Must provide task"); + } + + ListenableFutureTask ft = new ListenableFutureTask(false, task); + + schedule(ft, delayInMs); + + return ft; + } + + @Override + public ListenableFuture submitScheduled(Runnable task, T result, long delayInMs) { + if (task == null) { + throw new IllegalArgumentException("Must provide task"); + } + + ListenableFutureTask ft = new ListenableFutureTask(false, task, result); + + schedule(ft, delayInMs); + + return ft; + } + + @Override + public ListenableFuture submitScheduled(Callable task, long delayInMs) { + if (task == null) { + throw new IllegalArgumentException("Must provide task"); + } + + ListenableFutureTask ft = new ListenableFutureTask(false, task); + + schedule(ft, delayInMs); + + return ft; + } + @Override public void schedule(Runnable task, long delayInMs) { if (task == null) { diff --git a/src/main/java/org/threadly/concurrent/limiter/SubmitterSchedulerLimiter.java b/src/main/java/org/threadly/concurrent/limiter/SubmitterSchedulerLimiter.java deleted file mode 100644 index e9a1e3aa0..000000000 --- a/src/main/java/org/threadly/concurrent/limiter/SubmitterSchedulerLimiter.java +++ /dev/null @@ -1,374 +0,0 @@ -package org.threadly.concurrent.limiter; - -import java.util.Queue; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Future; - -import org.threadly.concurrent.CallableContainerInterface; -import org.threadly.concurrent.RunnableContainerInterface; -import org.threadly.concurrent.SubmitterSchedulerInterface; -import org.threadly.concurrent.VirtualRunnable; -import org.threadly.concurrent.future.FutureListenableFuture; -import org.threadly.concurrent.future.ListenableFuture; - -/** - *

This class is designed to limit how much parallel execution happens - * on a provided {@link SubmitterSchedulerInterface}. This allows the - * implementor to have one thread pool for all their code, and if - * they want certain sections to have less levels of parallelism - * (possibly because those those sections would completely consume the - * global pool), they can wrap the executor in this class.

- * - *

Thus providing you better control on the absolute thread count and - * how much parallelism can occur in different sections of the program.

- * - *

This is an alternative from having to create multiple thread pools. - * By using this you also are able to accomplish more efficiently thread use - * than multiple thread pools would.

- * - * @author jent - Mike Jensen - */ -public class SubmitterSchedulerLimiter extends AbstractSchedulerLimiter - implements SubmitterSchedulerInterface { - protected final SubmitterSchedulerInterface scheduler; - protected final Queue waitingTasks; - - /** - * Constructs a new limiter that implements the {@link SubmitterSchedulerInterface}. - * - * @param scheduler {@link SubmitterSchedulerInterface} implementation to submit task executions to. - * @param maxConcurrency maximum qty of runnables to run in parallel - */ - public SubmitterSchedulerLimiter(SubmitterSchedulerInterface scheduler, - int maxConcurrency) { - this(scheduler, maxConcurrency, null); - } - - /** - * Constructs a new limiter that implements the {@link SubmitterSchedulerInterface}. - * - * @param scheduler {@link SubmitterSchedulerInterface} implementation to submit task executions to. - * @param maxConcurrency maximum qty of runnables to run in parallel - * @param subPoolName name to describe threads while tasks running in pool (null to not change thread names) - */ - public SubmitterSchedulerLimiter(SubmitterSchedulerInterface scheduler, - int maxConcurrency, String subPoolName) { - super(maxConcurrency, subPoolName); - - if (scheduler == null) { - throw new IllegalArgumentException("Must provide scheduler"); - } - - this.scheduler = scheduler; - waitingTasks = new ConcurrentLinkedQueue(); - } - - @Override - protected void consumeAvailable() { - /* must synchronize in queue consumer to avoid - * multiple threads from consuming tasks in parallel - * and possibly emptying after .isEmpty() check but - * before .poll() - */ - synchronized (this) { - while (! waitingTasks.isEmpty() && canRunTask()) { - // by entering loop we can now execute task - Wrapper next = waitingTasks.poll(); - if (next.hasFuture()) { - if (next.isCallable()) { - Future f = scheduler.submit(next.getCallable()); - next.getFuture().setParentFuture(f); - } else { - Future f = scheduler.submit(next.getRunnable()); - next.getFuture().setParentFuture(f); - } - } else { - // all callables will have futures, so we know this is a runnable - scheduler.execute(next.getRunnable()); - } - } - } - } - - @Override - public boolean isShutdown() { - return scheduler.isShutdown(); - } - - @Override - public void execute(Runnable task) { - if (task == null) { - throw new IllegalArgumentException("Must provide task"); - } - - RunnableFutureWrapper wrapper = new RunnableFutureWrapper(task, null); - - if (canRunTask()) { // try to avoid adding to queue if we can - scheduler.execute(wrapper); - } else { - waitingTasks.add(wrapper); - consumeAvailable(); // call to consume in case task finished after first check - } - } - - @Override - public ListenableFuture submit(Runnable task) { - return submit(task, null); - } - - @Override - public ListenableFuture submit(Runnable task, T result) { - if (task == null) { - throw new IllegalArgumentException("Must provide task"); - } - - FutureListenableFuture ff = new FutureListenableFuture(); - - doSubmit(task, result, ff); - - return ff; - } - - private void doSubmit(Runnable task, T result, - FutureListenableFuture ff) { - RunnableFutureWrapper wrapper = new RunnableFutureWrapper(task, ff); - ff.setTaskCanceler(wrapper); - - if (canRunTask()) { // try to avoid adding to queue if we can - ff.setParentFuture(scheduler.submit(wrapper, result)); - } else { - waitingTasks.add(wrapper); - consumeAvailable(); // call to consume in case task finished after first check - } - } - - @Override - public ListenableFuture submit(Callable task) { - if (task == null) { - throw new IllegalArgumentException("Must provide task"); - } - - FutureListenableFuture ff = new FutureListenableFuture(); - - doSubmit(task, ff); - - return ff; - } - - private void doSubmit(Callable task, - FutureListenableFuture ff) { - CallableFutureWrapper wrapper = new CallableFutureWrapper(task, ff); - ff.setTaskCanceler(wrapper); - - if (canRunTask()) { // try to avoid adding to queue if we can - ff.setParentFuture(scheduler.submit(wrapper)); - } else { - waitingTasks.add(wrapper); - consumeAvailable(); // call to consume in case task finished after first check - } - } - - @Override - public void schedule(Runnable task, long delayInMs) { - if (task == null) { - throw new IllegalArgumentException("Must provide a task"); - } else if (delayInMs < 0) { - throw new IllegalArgumentException("delayInMs must be >= 0"); - } - - if (delayInMs == 0) { - execute(task); - } else { - scheduler.schedule(new DelayedExecutionRunnable(task, null, null), - delayInMs); - } - } - - @Override - public ListenableFuture submitScheduled(Runnable task, long delayInMs) { - return submitScheduled(task, null, delayInMs); - } - - @Override - public ListenableFuture submitScheduled(Runnable task, T result, long delayInMs) { - if (task == null) { - throw new IllegalArgumentException("Must provide a task"); - } else if (delayInMs < 0) { - throw new IllegalArgumentException("delayInMs must be >= 0"); - } - - FutureListenableFuture ff = new FutureListenableFuture(); - if (delayInMs == 0) { - doSubmit(task, result, ff); - } else { - scheduler.schedule(new DelayedExecutionRunnable(task, result, ff), - delayInMs); - } - - return ff; - } - - @Override - public ListenableFuture submitScheduled(Callable task, long delayInMs) { - if (task == null) { - throw new IllegalArgumentException("Must provide a task"); - } else if (delayInMs < 0) { - throw new IllegalArgumentException("delayInMs must be >= 0"); - } - - FutureListenableFuture ff = new FutureListenableFuture(); - if (delayInMs == 0) { - doSubmit(task, ff); - } else { - scheduler.schedule(new DelayedExecutionCallable(task, ff), - delayInMs); - } - - return ff; - } - - @Override - public void scheduleWithFixedDelay(Runnable task, long initialDelay, - long recurringDelay) { - if (task == null) { - throw new IllegalArgumentException("Must provide a task"); - } else if (initialDelay < 0) { - throw new IllegalArgumentException("initialDelay must be >= 0"); - } else if (recurringDelay < 0) { - throw new IllegalArgumentException("recurringDelay must be >= 0"); - } - - RecurringRunnableWrapper rrw = new RecurringRunnableWrapper(task, recurringDelay); - - if (initialDelay == 0) { - execute(rrw); - } else { - scheduler.schedule(new DelayedExecutionRunnable(rrw, null, null), - initialDelay); - } - } - - /** - *

Small runnable that allows scheduled tasks to pass through - * the same execution queue that immediate execution has to.

- * - * @author jent - Mike Jensen - */ - protected class DelayedExecutionRunnable extends VirtualRunnable - implements RunnableContainerInterface { - private final Runnable runnable; - private final T runnableResult; - private final FutureListenableFuture future; - - public DelayedExecutionRunnable(Runnable runnable, T runnableResult, - FutureListenableFuture future) { - this.runnable = runnable; - this.runnableResult = runnableResult; - this.future = future; - } - - @Override - public void run() { - if (future == null) { - execute(runnable); - } else { - doSubmit(runnable, runnableResult, future); - } - } - - @Override - public Runnable getContainedRunnable() { - return runnable; - } - } - - /** - *

Small runnable that allows scheduled tasks to pass through - * the same execution queue that immediate execution has to.

- * - * @author jent - Mike Jensen - */ - protected class DelayedExecutionCallable extends VirtualRunnable - implements CallableContainerInterface, - RunnableContainerInterface { - private final Callable callable; - private final FutureListenableFuture future; - - public DelayedExecutionCallable(Callable runnable, - FutureListenableFuture future) { - this.callable = runnable; - this.future = future; - } - - @Override - public void run() { - doSubmit(callable, future); - } - - @Override - public Runnable getContainedRunnable() { - if (callable instanceof RunnableContainerInterface) { - return ((RunnableContainerInterface)callable).getContainedRunnable(); - } else { - return null; - } - } - - @Override - public Callable getContainedCallable() { - return callable; - } - } - - /** - *

Wrapper for tasks which are executed in this sub pool, - * this ensures that handleTaskFinished() will be called - * after the task completes.

- * - * @author jent - Mike Jensen - */ - protected class RecurringRunnableWrapper extends LimiterRunnableWrapper - implements Wrapper { - private final long recurringDelay; - private final DelayedExecutionRunnable delayRunnable; - - public RecurringRunnableWrapper(Runnable runnable, - long recurringDelay) { - super(runnable); - - this.recurringDelay = recurringDelay; - delayRunnable = new DelayedExecutionRunnable(this, null, null); - } - - @Override - protected void doAfterRunTasks() { - scheduler.schedule(delayRunnable, recurringDelay); - } - - @Override - public boolean isCallable() { - return false; - } - - @Override - public FutureListenableFuture getFuture() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean hasFuture() { - return false; - } - - @Override - public Callable getCallable() { - throw new UnsupportedOperationException(); - } - - @Override - public Runnable getRunnable() { - return this; - } - } -} diff --git a/src/test/java/org/threadly/concurrent/limiter/SubmitterSchedulerLimiterTest.java b/src/test/java/org/threadly/concurrent/limiter/SchedulerLimiterTest.java similarity index 76% rename from src/test/java/org/threadly/concurrent/limiter/SubmitterSchedulerLimiterTest.java rename to src/test/java/org/threadly/concurrent/limiter/SchedulerLimiterTest.java index 533028b6a..a2302164c 100644 --- a/src/test/java/org/threadly/concurrent/limiter/SubmitterSchedulerLimiterTest.java +++ b/src/test/java/org/threadly/concurrent/limiter/SchedulerLimiterTest.java @@ -7,7 +7,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -20,27 +19,25 @@ import org.threadly.concurrent.SubmitterExecutorInterfaceTest; import org.threadly.concurrent.SubmitterSchedulerInterface; import org.threadly.concurrent.SubmitterSchedulerInterfaceTest; -import org.threadly.concurrent.TaskPriority; -import org.threadly.concurrent.SubmitterSchedulerInterfaceTest.SubmitterSchedulerFactory; import org.threadly.concurrent.TestCallable; -import org.threadly.concurrent.future.FutureListenableFuture; -import org.threadly.concurrent.limiter.SubmitterSchedulerLimiter; +import org.threadly.concurrent.SubmitterSchedulerInterfaceTest.SubmitterSchedulerFactory; +import org.threadly.concurrent.TaskPriority; import org.threadly.test.concurrent.TestRunnable; import org.threadly.test.concurrent.TestablePriorityScheduler; @SuppressWarnings("javadoc") -public class SubmitterSchedulerLimiterTest { +public class SchedulerLimiterTest { @Test public void constructorFail() { try { - new SubmitterSchedulerLimiter(null, 100); + new SchedulerLimiter(null, 100); fail("Exception should have thrown"); } catch (IllegalArgumentException e) { // expected } PriorityScheduledExecutor executor = new PriorityScheduledExecutor(1, 1, 100); try { - new SubmitterSchedulerLimiter(executor, 0); + new SchedulerLimiter(executor, 0); fail("Exception should have thrown"); } catch (IllegalArgumentException e) { // expected @@ -53,7 +50,7 @@ public void constructorFail() { public void constructorEmptySubPoolNameTest() { PriorityScheduledExecutor executor = new PriorityScheduledExecutor(1, 1, 100); try { - SubmitterSchedulerLimiter limiter = new SubmitterSchedulerLimiter(executor, 1, " "); + SchedulerLimiter limiter = new SchedulerLimiter(executor, 1, " "); assertNull(limiter.subPoolName); } finally { @@ -65,40 +62,18 @@ public void constructorEmptySubPoolNameTest() { public void consumeAvailableTest() { int testQty = 10; PriorityScheduledExecutor executor = new PriorityScheduledExecutor(1, 1, 10, TaskPriority.High, 100); - SubmitterSchedulerLimiter psl = new SubmitterSchedulerLimiter(executor, testQty); - - boolean flip1 = true; - boolean flip2 = true; + SchedulerLimiter limiter = new SchedulerLimiter(executor, testQty); List runnables = new ArrayList(testQty); for (int i = 0; i < testQty; i++) { - - if (flip1) { - TestRunnable tr = new TestRunnable(); - runnables.add(tr); - if (flip2) { - psl.waitingTasks.add(psl.new RunnableFutureWrapper(tr, - new FutureListenableFuture())); - flip2 = false; - } else { - psl.waitingTasks.add(psl.new RunnableFutureWrapper(tr, null)); - flip2 = true; - } - flip1 = false; - } else { - psl.waitingTasks.add(psl.new CallableFutureWrapper(new Callable() { - @Override - public Object call() throws Exception { - return new Object(); - } - }, new FutureListenableFuture())); - flip1 = true; - } + TestRunnable tr = new TestRunnable(); + runnables.add(tr); + limiter.waitingTasks.add(limiter.new LimiterRunnableWrapper(tr)); } - psl.consumeAvailable(); + limiter.consumeAvailable(); // should be fully consumed - assertEquals(0, psl.waitingTasks.size()); + assertEquals(0, limiter.waitingTasks.size()); Iterator it = runnables.iterator(); while (it.hasNext()) { @@ -120,6 +95,13 @@ public void executeNamedSubPoolTest() { SimpleSchedulerInterfaceTest.executeTest(sf); } + @Test (expected = IllegalArgumentException.class) + public void executeFail() { + SchedulerLimiterFactory sf = new SchedulerLimiterFactory(false); + + SimpleSchedulerInterfaceTest.executeFail(sf); + } + @Test public void submitRunnableTest() { submitRunnableTest(false); @@ -210,7 +192,7 @@ public void submitCallableTestableSchedulerTest() throws InterruptedException, E 1000 * 10); try { TestablePriorityScheduler testableScheduler = new TestablePriorityScheduler(executor); - SubmitterSchedulerLimiter limiter = new SubmitterSchedulerLimiter(executor, parallelCount); + SchedulerLimiter limiter = new SchedulerLimiter(executor, parallelCount); List callables = new ArrayList(runnableCount); List> futures = new ArrayList>(runnableCount); @@ -247,13 +229,6 @@ public void submitCallableTestableSchedulerTest() throws InterruptedException, E } } - @Test (expected = IllegalArgumentException.class) - public void executeTestFail() { - SchedulerLimiterFactory sf = new SchedulerLimiterFactory(false); - - SimpleSchedulerInterfaceTest.executeFail(sf); - } - @Test public void scheduleExecutionTest() { SchedulerLimiterFactory sf = new SchedulerLimiterFactory(false); @@ -276,116 +251,87 @@ public void scheduleExecutionFail() { } @Test - public void submitScheduledRunnableTest() { - submitScheduledRunnableTest(false); + public void recurringExecutionTest() { + SchedulerLimiterFactory sf = new SchedulerLimiterFactory(false); + + SimpleSchedulerInterfaceTest.recurringExecutionTest(sf); } @Test - public void submitScheduledRunnableNamedSubPoolTest() { - submitScheduledRunnableTest(true); + public void recurringExecutionNamedSubPoolTest() { + SchedulerLimiterFactory sf = new SchedulerLimiterFactory(true); + + SimpleSchedulerInterfaceTest.recurringExecutionTest(sf); } - public void submitScheduledRunnableTest(boolean nameSubPool) { - SchedulerLimiterFactory sf = new SchedulerLimiterFactory(nameSubPool); - // we can't defer to the interface implementation for this check - try { - int runnableCount = 10; - int scheduleDelay = 50; - - SubmitterSchedulerInterface scheduler = sf.makeSubmitterScheduler(runnableCount, true); - - List runnables = new ArrayList(runnableCount); - List> futures = new ArrayList>(runnableCount); - for (int i = 0; i < runnableCount; i++) { - TestRunnable tr = new TestRunnable(); - Future future = scheduler.submitScheduled(tr, scheduleDelay); - assertNotNull(future); - runnables.add(tr); - futures.add(future); - } - - // verify execution and execution times - Iterator it = runnables.iterator(); - while (it.hasNext()) { - TestRunnable tr = it.next(); - long executionDelay = tr.getDelayTillFirstRun(); - assertTrue(executionDelay >= scheduleDelay); - // should be very timely with a core pool size that matches runnable count - assertTrue(executionDelay <= (scheduleDelay + 2000)); - assertEquals(1, tr.getRunCount()); - } - - Iterator> futureIt = futures.iterator(); - while (futureIt.hasNext()) { - Future f = futureIt.next(); - try { - f.get(); - } catch (InterruptedException e) { - fail(); - } catch (ExecutionException e) { - fail(); - } - assertTrue(f.isDone()); - } - } finally { - sf.shutdown(); - } + @Test + public void recurringExecutionFail() { + SchedulerLimiterFactory sf = new SchedulerLimiterFactory(false); + + SimpleSchedulerInterfaceTest.recurringExecutionFail(sf); } @Test - public void submitScheduledCallableTest() throws InterruptedException, ExecutionException { + public void submitScheduledRunnableTest() throws InterruptedException, ExecutionException { SchedulerLimiterFactory sf = new SchedulerLimiterFactory(false); - SubmitterSchedulerInterfaceTest.submitScheduledCallableTest(sf); + SubmitterSchedulerInterfaceTest.submitScheduledRunnableTest(sf); } @Test - public void submitScheduledCallableNamedSubPoolTest() throws InterruptedException, ExecutionException { + public void submitScheduledRunnableNamedSubPoolTest() throws InterruptedException, ExecutionException { SchedulerLimiterFactory sf = new SchedulerLimiterFactory(true); - SubmitterSchedulerInterfaceTest.submitScheduledCallableTest(sf); + SubmitterSchedulerInterfaceTest.submitScheduledRunnableTest(sf); } @Test - public void submitScheduledRunnableFail() { + public void submitScheduledRunnableWithResultTest() throws InterruptedException, ExecutionException { SchedulerLimiterFactory sf = new SchedulerLimiterFactory(false); - SubmitterSchedulerInterfaceTest.submitScheduledRunnableFail(sf); + SubmitterSchedulerInterfaceTest.submitScheduledRunnableWithResultTest(sf); } @Test - public void submitScheduledCallableFail() { + public void submitScheduledRunnableWithResultNamedSubPoolTest() throws InterruptedException, ExecutionException { + SchedulerLimiterFactory sf = new SchedulerLimiterFactory(true); + + SubmitterSchedulerInterfaceTest.submitScheduledRunnableWithResultTest(sf); + } + + @Test + public void submitScheduledRunnableFail() { SchedulerLimiterFactory sf = new SchedulerLimiterFactory(false); - SubmitterSchedulerInterfaceTest.submitScheduledCallableFail(sf); + SubmitterSchedulerInterfaceTest.submitScheduledRunnableFail(sf); } @Test - public void recurringExecutionTest() { + public void submitScheduledCallableTest() throws InterruptedException, ExecutionException { SchedulerLimiterFactory sf = new SchedulerLimiterFactory(false); - SimpleSchedulerInterfaceTest.recurringExecutionTest(sf); + SubmitterSchedulerInterfaceTest.submitScheduledCallableTest(sf); } @Test - public void recurringExecutionNamedSubPoolTest() { + public void submitScheduledCallableNamedSubPoolTest() throws InterruptedException, ExecutionException { SchedulerLimiterFactory sf = new SchedulerLimiterFactory(true); - SimpleSchedulerInterfaceTest.recurringExecutionTest(sf); + SubmitterSchedulerInterfaceTest.submitScheduledCallableTest(sf); } @Test - public void recurringExecutionFail() { + public void submitScheduledCallableFail() { SchedulerLimiterFactory sf = new SchedulerLimiterFactory(false); - SimpleSchedulerInterfaceTest.recurringExecutionFail(sf); + SubmitterSchedulerInterfaceTest.submitScheduledCallableFail(sf); } @Test public void removeRunnableTest() { PriorityScheduledExecutor scheduler = new PriorityScheduledExecutor(2, 2, 1000); try { - SubmitterSchedulerLimiter limiter = new SubmitterSchedulerLimiter(scheduler, 2); + SchedulerLimiter limiter = new SchedulerLimiter(scheduler, 2); TestRunnable task = new TestRunnable(); limiter.schedule(task, 1000 * 10); @@ -398,29 +344,12 @@ public void removeRunnableTest() { } } - @Test - public void removeCallableTest() { - PriorityScheduledExecutor scheduler = new PriorityScheduledExecutor(2, 2, 1000); - try { - SubmitterSchedulerLimiter limiter = new SubmitterSchedulerLimiter(scheduler, 1); - - TestCallable task = new TestCallable(); - limiter.submitScheduled(task, 1000 * 10); - - assertFalse(scheduler.remove(new TestCallable())); - - assertTrue(scheduler.remove(task)); - } finally { - scheduler.shutdownNow(); - } - } - @Test public void removeBlockedRunnableTest() { PriorityScheduledExecutor scheduler = new PriorityScheduledExecutor(1, 1, 1000); BlockingTestRunnable blockingRunnable = new BlockingTestRunnable(); try { - SubmitterSchedulerLimiter limiter = new SubmitterSchedulerLimiter(scheduler, 2); + SchedulerLimiter limiter = new SchedulerLimiter(scheduler, 2); scheduler.execute(blockingRunnable); scheduler.execute(blockingRunnable); blockingRunnable.blockTillStarted(); @@ -436,12 +365,29 @@ public void removeBlockedRunnableTest() { } } + @Test + public void removeCallableTest() { + PriorityScheduledExecutor scheduler = new PriorityScheduledExecutor(2, 2, 1000); + try { + SchedulerLimiter limiter = new SchedulerLimiter(scheduler, 1); + + TestCallable task = new TestCallable(); + limiter.submitScheduled(task, 1000 * 10); + + assertFalse(scheduler.remove(new TestCallable())); + + assertTrue(scheduler.remove(task)); + } finally { + scheduler.shutdownNow(); + } + } + @Test public void removeBlockedCallableTest() { PriorityScheduledExecutor scheduler = new PriorityScheduledExecutor(1, 1, 1000); BlockingTestRunnable blockingRunnable = new BlockingTestRunnable(); try { - SubmitterSchedulerLimiter limiter = new SubmitterSchedulerLimiter(scheduler, 2); + SchedulerLimiter limiter = new SchedulerLimiter(scheduler, 2); scheduler.execute(blockingRunnable); scheduler.execute(blockingRunnable); blockingRunnable.blockTillStarted(); @@ -473,21 +419,29 @@ public void uncaughtException(Thread t, Throwable e) { executors = new LinkedList(); this.addSubPoolName = addSubPoolName; } + + @Override + public void shutdown() { + Iterator it = executors.iterator(); + while (it.hasNext()) { + it.next().shutdown(); + it.remove(); + } + } @Override public SubmitterExecutorInterface makeSubmitterExecutor(int poolSize, boolean prestartIfAvailable) { return makeSubmitterScheduler(poolSize, prestartIfAvailable); } - + @Override - public SimpleSchedulerInterface makeSimpleScheduler(int poolSize, - boolean prestartIfAvailable) { + public SimpleSchedulerInterface makeSimpleScheduler(int poolSize, boolean prestartIfAvailable) { return makeSubmitterScheduler(poolSize, prestartIfAvailable); } - + @Override - public SubmitterSchedulerInterface makeSubmitterScheduler(int poolSize, + public SubmitterSchedulerInterface makeSubmitterScheduler(int poolSize, boolean prestartIfAvailable) { PriorityScheduledExecutor executor = new PriorityScheduledExecutor(poolSize, poolSize, 1000 * 10); @@ -497,18 +451,9 @@ public SubmitterSchedulerInterface makeSubmitterScheduler(int poolSize, executors.add(executor); if (addSubPoolName) { - return new SubmitterSchedulerLimiter(executor, poolSize, "TestSubPool"); + return new SchedulerLimiter(executor, poolSize, "TestSubPool"); } else { - return new SubmitterSchedulerLimiter(executor, poolSize); - } - } - - @Override - public void shutdown() { - Iterator it = executors.iterator(); - while (it.hasNext()) { - it.next().shutdown(); - it.remove(); + return new SchedulerLimiter(executor, poolSize); } } } diff --git a/src/test/java/org/threadly/concurrent/limiter/SimpleSchedulerLimiterTest.java b/src/test/java/org/threadly/concurrent/limiter/SimpleSchedulerLimiterTest.java deleted file mode 100644 index ecba55566..000000000 --- a/src/test/java/org/threadly/concurrent/limiter/SimpleSchedulerLimiterTest.java +++ /dev/null @@ -1,217 +0,0 @@ -package org.threadly.concurrent.limiter; - -import static org.junit.Assert.*; - -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; - -import org.junit.Test; -import org.threadly.concurrent.BlockingTestRunnable; -import org.threadly.concurrent.PriorityScheduledExecutor; -import org.threadly.concurrent.SimpleSchedulerInterfaceTest; -import org.threadly.concurrent.SimpleSchedulerInterfaceTest.SimpleSchedulerFactory; -import org.threadly.concurrent.TaskPriority; -import org.threadly.test.concurrent.TestRunnable; - -@SuppressWarnings("javadoc") -public class SimpleSchedulerLimiterTest { - @Test - public void constructorFail() { - try { - new SimpleSchedulerLimiter(null, 100); - fail("Exception should have thrown"); - } catch (IllegalArgumentException e) { - // expected - } - PriorityScheduledExecutor executor = new PriorityScheduledExecutor(1, 1, 100); - try { - new SimpleSchedulerLimiter(executor, 0); - fail("Exception should have thrown"); - } catch (IllegalArgumentException e) { - // expected - } finally { - executor.shutdown(); - } - } - - @Test - public void constructorEmptySubPoolNameTest() { - PriorityScheduledExecutor executor = new PriorityScheduledExecutor(1, 1, 100); - try { - SimpleSchedulerLimiter limiter = new SimpleSchedulerLimiter(executor, 1, " "); - - assertNull(limiter.subPoolName); - } finally { - executor.shutdown(); - } - } - - @Test - public void consumeAvailableTest() { - int testQty = 10; - PriorityScheduledExecutor executor = new PriorityScheduledExecutor(1, 1, 10, TaskPriority.High, 100); - SimpleSchedulerLimiter limiter = new SimpleSchedulerLimiter(executor, testQty); - List runnables = new ArrayList(testQty); - for (int i = 0; i < testQty; i++) { - TestRunnable tr = new TestRunnable(); - runnables.add(tr); - limiter.waitingTasks.add(limiter.new LimiterRunnableWrapper(tr)); - } - - limiter.consumeAvailable(); - - // should be fully consumed - assertEquals(0, limiter.waitingTasks.size()); - - Iterator it = runnables.iterator(); - while (it.hasNext()) { - it.next().blockTillFinished(); // throws exception if it does not finish - } - } - - @Test - public void executeTest() { - SchedulerLimiterFactory sf = new SchedulerLimiterFactory(false); - - SimpleSchedulerInterfaceTest.executeTest(sf); - } - - @Test - public void executeNamedSubPoolTest() { - SchedulerLimiterFactory sf = new SchedulerLimiterFactory(true); - - SimpleSchedulerInterfaceTest.executeTest(sf); - } - - @Test (expected = IllegalArgumentException.class) - public void executeTestFail() { - SchedulerLimiterFactory sf = new SchedulerLimiterFactory(false); - - SimpleSchedulerInterfaceTest.executeFail(sf); - } - - @Test - public void scheduleExecutionTest() { - SchedulerLimiterFactory sf = new SchedulerLimiterFactory(false); - - SimpleSchedulerInterfaceTest.scheduleTest(sf); - } - - @Test - public void scheduleExecutionNamedSubPoolTest() { - SchedulerLimiterFactory sf = new SchedulerLimiterFactory(true); - - SimpleSchedulerInterfaceTest.scheduleTest(sf); - } - - @Test - public void scheduleExecutionFail() { - SchedulerLimiterFactory sf = new SchedulerLimiterFactory(false); - - SimpleSchedulerInterfaceTest.scheduleFail(sf); - } - - @Test - public void recurringExecutionTest() { - SchedulerLimiterFactory sf = new SchedulerLimiterFactory(false); - - SimpleSchedulerInterfaceTest.recurringExecutionTest(sf); - } - - @Test - public void recurringExecutionNamedSubPoolTest() { - SchedulerLimiterFactory sf = new SchedulerLimiterFactory(true); - - SimpleSchedulerInterfaceTest.recurringExecutionTest(sf); - } - - @Test - public void recurringExecutionFail() { - SchedulerLimiterFactory sf = new SchedulerLimiterFactory(false); - - SimpleSchedulerInterfaceTest.recurringExecutionFail(sf); - } - - @Test - public void removeRunnableTest() { - PriorityScheduledExecutor scheduler = new PriorityScheduledExecutor(2, 2, 1000); - try { - SimpleSchedulerLimiter limiter = new SimpleSchedulerLimiter(scheduler, 2); - - TestRunnable task = new TestRunnable(); - limiter.schedule(task, 1000 * 10); - - assertFalse(scheduler.remove(new TestRunnable())); - - assertTrue(scheduler.remove(task)); - } finally { - scheduler.shutdownNow(); - } - } - - @Test - public void removeBlockedRunnableTest() { - PriorityScheduledExecutor scheduler = new PriorityScheduledExecutor(1, 1, 1000); - BlockingTestRunnable blockingRunnable = new BlockingTestRunnable(); - try { - SimpleSchedulerLimiter limiter = new SimpleSchedulerLimiter(scheduler, 2); - scheduler.execute(blockingRunnable); - scheduler.execute(blockingRunnable); - blockingRunnable.blockTillStarted(); - - TestRunnable task = new TestRunnable(); - limiter.execute(task); - - assertFalse(scheduler.remove(new TestRunnable())); - assertTrue(scheduler.remove(task)); - } finally { - blockingRunnable.unblock(); - scheduler.shutdownNow(); - } - } - - private class SchedulerLimiterFactory implements SimpleSchedulerFactory { - private final List executors; - private final boolean addSubPoolName; - - private SchedulerLimiterFactory(boolean addSubPoolName) { - Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - // ignored - } - }); - - executors = new LinkedList(); - this.addSubPoolName = addSubPoolName; - } - - @Override - public SimpleSchedulerLimiter makeSimpleScheduler(int poolSize, boolean prestartIfAvailable) { - PriorityScheduledExecutor executor = new PriorityScheduledExecutor(poolSize, poolSize, - 1000 * 10); - if (prestartIfAvailable) { - executor.prestartAllCoreThreads(); - } - executors.add(executor); - - if (addSubPoolName) { - return new SimpleSchedulerLimiter(executor, poolSize, "TestSubPool"); - } else { - return new SimpleSchedulerLimiter(executor, poolSize); - } - } - - @Override - public void shutdown() { - Iterator it = executors.iterator(); - while (it.hasNext()) { - it.next().shutdown(); - it.remove(); - } - } - } -}