Skip to content

Commit

Permalink
Updating cancel tracker eviction logic and adding desired UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>
  • Loading branch information
sgup432 committed Jun 8, 2023
1 parent ee0d61a commit ba766f3
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,7 @@ public void onTaskCompleted(Task task) {
if (!TASKS_TO_TRACK.contains(task.getClass())) {
return;
}
if (!this.cancelledTaskTracker.containsKey(task.getId())) {
return;
}
this.cancelledTaskTracker.remove(task.getId());
this.cancelledTaskTracker.entrySet().removeIf(entry -> entry.getKey() == task.getId());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void cleanup() {
ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS);
}

public void testWithShouldRunDisabled() {
public void testWithNoCurrentRunningCancelledTasks() {
TaskCancellationMonitoringSettings settings = new TaskCancellationMonitoringSettings(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
Expand Down Expand Up @@ -110,7 +110,7 @@ public void testWithNonZeroCancelledSearchShardTasksRunning() throws Interrupted
stats = taskCancellationMonitoringService.stats();
assertEquals(numberOfTasksCancelled, stats.getSearchShardTaskCancellationStats().getCurrentLongRunningCancelledTaskCount());
assertEquals(numberOfTasksCancelled, stats.getSearchShardTaskCancellationStats().getTotalLongRunningCancelledTaskCount());
completeTasksConcurrently(tasks).await();
completeTasksConcurrently(tasks, 0, tasks.size() - 1).await();
taskCancellationMonitoringService.doRun(); // 3rd run to verify current count is 0 and total remains the same.
stats = taskCancellationMonitoringService.stats();
assertTrue(taskCancellationMonitoringService.getCancelledTaskTracker().isEmpty());
Expand Down Expand Up @@ -150,7 +150,7 @@ public void testShouldRunGetsDisabledAfterTaskCompletion() throws InterruptedExc
assertEquals(numTasks, stats.getSearchShardTaskCancellationStats().getCurrentLongRunningCancelledTaskCount());
assertEquals(numTasks, stats.getSearchShardTaskCancellationStats().getTotalLongRunningCancelledTaskCount());

completeTasksConcurrently(tasks).await();
completeTasksConcurrently(tasks, 0, tasks.size() - 1).await();
stats = taskCancellationMonitoringService.stats();
assertTrue(taskCancellationMonitoringService.getCancelledTaskTracker().isEmpty());
assertEquals(0, stats.getSearchShardTaskCancellationStats().getCurrentLongRunningCancelledTaskCount());
Expand Down Expand Up @@ -206,14 +206,72 @@ public void testWithVaryingCancelledTasksDuration() throws InterruptedException
assertEquals(numTasks, stats.getSearchShardTaskCancellationStats().getCurrentLongRunningCancelledTaskCount());
assertEquals(numTasks, stats.getSearchShardTaskCancellationStats().getTotalLongRunningCancelledTaskCount());

completeTasksConcurrently(tasks).await();
completeTasksConcurrently(tasks, 0, tasks.size() - 1).await();
taskCancellationMonitoringService.doRun();
stats = taskCancellationMonitoringService.stats();
// Verify no current running tasks
assertEquals(0, stats.getSearchShardTaskCancellationStats().getCurrentLongRunningCancelledTaskCount());
assertEquals(numTasks, stats.getSearchShardTaskCancellationStats().getTotalLongRunningCancelledTaskCount());
}

public void testTasksAreGettingEvictedCorrectlyAfterCompletion() throws InterruptedException {
Settings settings = Settings.builder()
.put(DURATION_MILLIS_SETTING.getKey(), 0) // Setting to one for testing
.build();
TaskCancellationMonitoringSettings taskCancellationMonitoringSettings = new TaskCancellationMonitoringSettings(
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);

TaskCancellationMonitoringService taskCancellationMonitoringService = new TaskCancellationMonitoringService(
threadPool,
taskManager,
taskCancellationMonitoringSettings
);

// Start few tasks.
int numTasks = randomIntBetween(5, 50);
List<SearchShardTask> tasks = createTasks(numTasks);
assertTrue(taskCancellationMonitoringService.getCancelledTaskTracker().isEmpty());
int numTasksToBeCancelledInFirstIteration = randomIntBetween(2, numTasks - 1);
CountDownLatch countDownLatch = cancelTasksConcurrently(tasks, 0, numTasksToBeCancelledInFirstIteration - 1);
countDownLatch.await(); // Wait for all tasks to be cancelled in first iteration

assertEquals(numTasksToBeCancelledInFirstIteration, taskCancellationMonitoringService.getCancelledTaskTracker().size());
// Verify desired task ids are present.
for (int itr = 0; itr < numTasksToBeCancelledInFirstIteration; itr++) {
assertTrue(taskCancellationMonitoringService.getCancelledTaskTracker().containsKey(tasks.get(itr).getId()));
}
// Cancel rest of the tasks
cancelTasksConcurrently(tasks, numTasksToBeCancelledInFirstIteration, numTasks - 1).await();
for (int itr = 0; itr < tasks.size(); itr++) {
assertTrue(taskCancellationMonitoringService.getCancelledTaskTracker().containsKey(tasks.get(itr).getId()));
}
// Complete one task to start with.
completeTasksConcurrently(tasks, 0, 0).await();
assertFalse(taskCancellationMonitoringService.getCancelledTaskTracker().containsKey(tasks.get(0).getId()));
// Verify rest of the tasks are still present in tracker
for (int itr = 1; itr < tasks.size(); itr++) {
assertTrue(taskCancellationMonitoringService.getCancelledTaskTracker().containsKey(tasks.get(itr).getId()));
}
// Complete first iteration tasks
completeTasksConcurrently(tasks, 1, numTasksToBeCancelledInFirstIteration - 1).await();
// Verify desired tasks were evicted from tracker map
for (int itr = 0; itr < numTasksToBeCancelledInFirstIteration; itr++) {
assertFalse(taskCancellationMonitoringService.getCancelledTaskTracker().containsKey(tasks.get(0).getId()));
}
// Verify rest of the tasks are still present in tracker
for (int itr = numTasksToBeCancelledInFirstIteration; itr < tasks.size(); itr++) {
assertTrue(taskCancellationMonitoringService.getCancelledTaskTracker().containsKey(tasks.get(itr).getId()));
}
// Complete all of them finally
completeTasksConcurrently(tasks, numTasksToBeCancelledInFirstIteration, tasks.size() - 1).await();
assertTrue(taskCancellationMonitoringService.getCancelledTaskTracker().isEmpty());
for (int itr = 0; itr < tasks.size(); itr++) {
assertFalse(taskCancellationMonitoringService.getCancelledTaskTracker().containsKey(tasks.get(itr).getId()));
}
}

public void testDoStartAndStop() {
TaskCancellationMonitoringSettings settings = new TaskCancellationMonitoringSettings(
Settings.EMPTY,
Expand Down Expand Up @@ -265,13 +323,16 @@ private CountDownLatch cancelTasksConcurrently(List<? extends CancellableTask> t
return countDownLatch;
}

private CountDownLatch completeTasksConcurrently(List<? extends CancellableTask> tasks) {
int numTasks = tasks.size();
Phaser phaser = new Phaser(numTasks + 1);
Thread[] threads = new Thread[numTasks];
CountDownLatch countDownLatch = new CountDownLatch(numTasks);
for (int i = 0; i < numTasks; i++) {
int idx = i;
private CountDownLatch completeTasksConcurrently(List<? extends CancellableTask> tasks, int completeFromIdx, int completeTillIdx) {
assert completeFromIdx >= 0;
assert completeTillIdx <= tasks.size() - 1;
assert completeTillIdx >= completeFromIdx;
int totalTasksToBeCompleted = completeTillIdx - completeFromIdx + 1;
Thread[] threads = new Thread[totalTasksToBeCompleted];
Phaser phaser = new Phaser(totalTasksToBeCompleted + 1);
CountDownLatch countDownLatch = new CountDownLatch(totalTasksToBeCompleted);
for (int i = 0; i < totalTasksToBeCompleted; i++) {
int idx = i + completeFromIdx;
threads[i] = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
taskManager.unregister(tasks.get(idx));
Expand Down

0 comments on commit ba766f3

Please sign in to comment.