Skip to content

Commit

Permalink
TaskHandle should not count forced splits as running splits
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed Feb 12, 2015
1 parent d3fa140 commit cfe93af
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 4 deletions.
Expand Up @@ -191,7 +191,7 @@ public synchronized List<ListenableFuture<?>> enqueueSplits(TaskHandle taskHandl
// Note: we do not record queued time for forced splits // Note: we do not record queued time for forced splits
startSplit(prioritizedSplitRunner); startSplit(prioritizedSplitRunner);
// add the runner to the handle so it can be destroyed if the task is canceled // add the runner to the handle so it can be destroyed if the task is canceled
taskHandle.recordRunningSplit(prioritizedSplitRunner); taskHandle.recordForcedRunningSplit(prioritizedSplitRunner);
} }
else { else {
// add this to the work queue for the task // add this to the work queue for the task
Expand Down Expand Up @@ -287,6 +287,7 @@ public static class TaskHandle
private final TaskId taskId; private final TaskId taskId;
private final Queue<PrioritizedSplitRunner> queuedSplits = new ArrayDeque<>(10); private final Queue<PrioritizedSplitRunner> queuedSplits = new ArrayDeque<>(10);
private final List<PrioritizedSplitRunner> runningSplits = new ArrayList<>(10); private final List<PrioritizedSplitRunner> runningSplits = new ArrayList<>(10);
private final List<PrioritizedSplitRunner> forcedRunningSplits = new ArrayList<>(10);
private final AtomicLong taskThreadUsageNanos = new AtomicLong(); private final AtomicLong taskThreadUsageNanos = new AtomicLong();


private final AtomicInteger nextSplitId = new AtomicInteger(); private final AtomicInteger nextSplitId = new AtomicInteger();
Expand All @@ -308,6 +309,11 @@ private TaskId getTaskId()


private void destroy() private void destroy()
{ {
for (PrioritizedSplitRunner runningSplit : forcedRunningSplits) {
runningSplit.destroy();
}
forcedRunningSplits.clear();

for (PrioritizedSplitRunner runningSplit : runningSplits) { for (PrioritizedSplitRunner runningSplit : runningSplits) {
runningSplit.destroy(); runningSplit.destroy();
} }
Expand All @@ -324,12 +330,13 @@ private void enqueueSplit(PrioritizedSplitRunner split)
queuedSplits.add(split); queuedSplits.add(split);
} }


private void recordRunningSplit(PrioritizedSplitRunner split) private void recordForcedRunningSplit(PrioritizedSplitRunner split)
{ {
runningSplits.add(split); forcedRunningSplits.add(split);
} }


private int getRunningSplits() @VisibleForTesting
int getRunningSplits()
{ {
return runningSplits.size(); return runningSplits.size();
} }
Expand All @@ -350,6 +357,7 @@ private PrioritizedSplitRunner pollNextSplit()


private void splitComplete(PrioritizedSplitRunner split) private void splitComplete(PrioritizedSplitRunner split)
{ {
forcedRunningSplits.remove(split);
runningSplits.remove(split); runningSplits.remove(split);
} }


Expand Down
Expand Up @@ -113,6 +113,35 @@ public void test()
} }
} }


@Test
public void testTaskHandle()
throws Exception
{
TaskExecutor taskExecutor = new TaskExecutor(4, 8);
taskExecutor.start();

try {
TaskHandle taskHandle = taskExecutor.addTask(new TaskId("test", "test", "test"));

Phaser beginPhase = new Phaser();
beginPhase.register();
Phaser verificationComplete = new Phaser();
verificationComplete.register();
TestingJob driver = new TestingJob(beginPhase, verificationComplete, 10);

// force enqueue a split
taskExecutor.enqueueSplits(taskHandle, true, ImmutableList.of(driver));
assertEquals(taskHandle.getRunningSplits(), 0);

// normal enqueue a split
taskExecutor.enqueueSplits(taskHandle, false, ImmutableList.of(driver));
assertEquals(taskHandle.getRunningSplits(), 1);
}
finally {
taskExecutor.stop();
}
}

private static class TestingJob private static class TestingJob
implements SplitRunner implements SplitRunner
{ {
Expand Down

0 comments on commit cfe93af

Please sign in to comment.