Permalink
Browse files

Fixed concurrency bug in test: Task froze on testHasEnded.await() whi…

…le main thread waits at pool.awaitForCurrentTasksToFinish(), because clientIsWaitingForTasksToFinish.countDown() was called before the client has begun waiting on pool.awaitForCurrentTasksToFinish()
  • Loading branch information...
orfjackal committed Dec 2, 2008
1 parent d9df888 commit 7ef69c7927ade13147b1a1ea63add6cb406dabf9
@@ -33,11 +33,13 @@
import com.google.inject.*;
import net.orfjackal.dimdwarf.tasks.TaskExecutor;
+import org.jetbrains.annotations.TestOnly;
import org.slf4j.*;
import javax.annotation.concurrent.ThreadSafe;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Esko Luontola
@@ -54,6 +56,7 @@
private final Thread consumer;
private final ExecutorService workers;
private final Set<CountDownLatch> runningTasks = Collections.synchronizedSet(new HashSet<CountDownLatch>());
+ private final AtomicInteger waitingForCurrentTasksToFinish = new AtomicInteger(0);
private volatile boolean shutdown = false;
@Inject
@@ -115,11 +118,21 @@ public void awaitForCurrentTasksToFinish() throws InterruptedException {
// big and would contain a null entry (toArray() does not shrink the array parameter
// if it's too big).
CountDownLatch[] snapshotOfRunningTasks = runningTasks.toArray(new CountDownLatch[0]);
- for (CountDownLatch taskHasFinished : snapshotOfRunningTasks) {
- taskHasFinished.await();
+ waitingForCurrentTasksToFinish.incrementAndGet();
+ try {
+ for (CountDownLatch taskHasFinished : snapshotOfRunningTasks) {
+ taskHasFinished.await();
+ }
+ } finally {
+ waitingForCurrentTasksToFinish.decrementAndGet();
}
}
+ @TestOnly
+ int getWaitingForCurrentTasksToFinishCount() {
+ return waitingForCurrentTasksToFinish.get();
+ }
+
private class TaskConsumer implements Runnable {
public void run() {
@@ -92,25 +92,6 @@ public void destroy() throws Exception {
pool.shutdown();
}
- private static void executeAfterCurrentThreadIsWaiting(final Runnable command) {
- final Thread currentThread = Thread.currentThread();
- Thread t = new Thread(new Runnable() {
- public void run() {
- Thread.State state = currentThread.getState();
- if (state.equals(Thread.State.RUNNABLE)) {
- try {
- Thread.sleep(100); // TODO: figure out a more reliable thread synchronization method than sleeping
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- command.run();
- }
- });
- t.setPriority(Thread.MIN_PRIORITY);
- t.start();
- }
-
public class WhenTasksAreAddedToTheQueue {
@@ -251,13 +232,17 @@ public void run() {
taskQueue.add(new SimpleTaskBootstrap(task1));
firstTaskIsExecuting.await();
- executeAfterCurrentThreadIsWaiting(new Runnable() {
+ Thread t = new Thread(new Runnable() {
public void run() {
- // Let's hope that this gets executed *after* the client begins waiting.
- // There is no guarantee that this thread won't be executed first...
+ while (pool.getWaitingForCurrentTasksToFinishCount() == 0) {
+ Thread.yield();
+ }
clientIsWaitingForTasksToFinish.countDown();
}
});
+ t.setPriority(Thread.MIN_PRIORITY);
+ t.start();
+
pool.awaitForCurrentTasksToFinish();
}

0 comments on commit 7ef69c7

Please sign in to comment.