Skip to content

Commit

Permalink
Refactoring changes
Browse files Browse the repository at this point in the history
Signed-off-by: Ketan Verma <ketan9495@gmail.com>
  • Loading branch information
ketanv3 committed Oct 22, 2022
1 parent 8d77884 commit de64339
Show file tree
Hide file tree
Showing 11 changed files with 53 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ TaskCancellation getTaskCancellation(CancellableTask task) {
List<Runnable> callbacks = new ArrayList<>();

for (TaskResourceUsageTracker tracker : taskResourceUsageTrackers) {
Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
if (reason.isPresent()) {
reasons.add(reason.get());
callbacks.add(tracker::incrementCancellations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;

import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.CPU_USAGE_TRACKER;

/**
* CpuUsageTracker evaluates if the task has consumed too many CPU cycles than allowed.
*
* @opensearch.internal
*/
public class CpuUsageTracker extends TaskResourceUsageTracker {
public static final String NAME = "cpu_usage_tracker";

private final LongSupplier cpuTimeNanosThresholdSupplier;

public CpuUsageTracker(SearchBackpressureSettings settings) {
Expand All @@ -33,14 +33,11 @@ public CpuUsageTracker(SearchBackpressureSettings settings) {

@Override
public String name() {
return NAME;
return CPU_USAGE_TRACKER.getName();
}

@Override
public void update(Task task) {}

@Override
public Optional<TaskCancellation.Reason> cancellationReason(Task task) {
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
long usage = task.getTotalResourceStats().getCpuTimeInNanos();
long threshold = cpuTimeNanosThresholdSupplier.getAsLong();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;

import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER;

/**
* ElapsedTimeTracker evaluates if the task has been running for more time than allowed.
*
* @opensearch.internal
*/
public class ElapsedTimeTracker extends TaskResourceUsageTracker {
public static final String NAME = "elapsed_time_tracker";

private final LongSupplier timeNanosSupplier;
private final LongSupplier elapsedTimeNanosThresholdSupplier;

Expand All @@ -35,14 +35,11 @@ public ElapsedTimeTracker(SearchBackpressureSettings settings, LongSupplier time

@Override
public String name() {
return NAME;
return ELAPSED_TIME_TRACKER.getName();
}

@Override
public void update(Task task) {}

@Override
public Optional<TaskCancellation.Reason> cancellationReason(Task task) {
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos();
long threshold = elapsedTimeNanosThresholdSupplier.getAsLong();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
import java.util.function.DoubleSupplier;
import java.util.function.LongSupplier;

import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER;

/**
* HeapUsageTracker evaluates if the task has consumed too much heap than allowed.
* It also compares the task's heap usage against a historical moving average of previously completed tasks.
*
* @opensearch.internal
*/
public class HeapUsageTracker extends TaskResourceUsageTracker implements SearchShardTaskSettings.Listener {
public static final String NAME = "heap_usage_tracker";

private final LongSupplier heapBytesThresholdSupplier;
private final DoubleSupplier heapVarianceThresholdSupplier;
private final AtomicReference<MovingAverage> movingAverageReference;
Expand All @@ -44,7 +44,7 @@ public HeapUsageTracker(SearchBackpressureSettings settings) {

@Override
public String name() {
return NAME;
return HEAP_USAGE_TRACKER.getName();
}

@Override
Expand All @@ -53,7 +53,7 @@ public void update(Task task) {
}

@Override
public Optional<TaskCancellation.Reason> cancellationReason(Task task) {
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
MovingAverage movingAverage = movingAverageReference.get();

// There haven't been enough measurements.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ public long getCancellations() {
/**
* Notifies the tracker to update its state when a task execution completes.
*/
public abstract void update(Task task);
public void update(Task task) {}

/**
* Returns the cancellation reason for the given task, if it's eligible for cancellation.
*/
public abstract Optional<TaskCancellation.Reason> cancellationReason(Task task);
public abstract Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.backpressure.trackers;

/**
* Defines the type of TaskResourceUsageTracker.
*/
public enum TaskResourceUsageTrackerType {
CPU_USAGE_TRACKER("cpu_usage_tracker"),
HEAP_USAGE_TRACKER("heap_usage_tracker"),
ELAPSED_TIME_TRACKER("elapsed_time_tracker");

private final String name;

TaskResourceUsageTrackerType(String name) {
this.name = name;
}

public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public String name() {
public void update(Task task) {}

@Override
public Optional<TaskCancellation.Reason> cancellationReason(Task task) {
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
if (task.getTotalResourceStats().getCpuTimeInNanos() < 300) {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void testEligibleForCancellation() {
Task task = createMockTaskWithResourceStats(SearchShardTask.class, 200000000, 200);
CpuUsageTracker tracker = new CpuUsageTracker(mockSettings);

Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
assertTrue(reason.isPresent());
assertEquals(1, reason.get().getCancellationScore());
assertEquals("cpu usage exceeded [200ms >= 15ms]", reason.get().getMessage());
Expand All @@ -43,7 +43,7 @@ public void testNotEligibleForCancellation() {
Task task = createMockTaskWithResourceStats(SearchShardTask.class, 5000000, 200);
CpuUsageTracker tracker = new CpuUsageTracker(mockSettings);

Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
assertFalse(reason.isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void testEligibleForCancellation() {
Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 0);
ElapsedTimeTracker tracker = new ElapsedTimeTracker(mockSettings, () -> 200000000);

Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
assertTrue(reason.isPresent());
assertEquals(1, reason.get().getCancellationScore());
assertEquals("elapsed time exceeded [200ms >= 100ms]", reason.get().getMessage());
Expand All @@ -44,7 +44,7 @@ public void testNotEligibleForCancellation() {
Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 150000000);
ElapsedTimeTracker tracker = new ElapsedTimeTracker(mockSettings, () -> 200000000);

Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
assertFalse(reason.isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void testEligibleForCancellation() {

// Task that has heap usage >= heapBytesThreshold and (movingAverage * heapVariance).
task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 200);
Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
assertTrue(reason.isPresent());
assertEquals(4, reason.get().getCancellationScore());
assertEquals("heap usage exceeded [200b >= 100b]", reason.get().getMessage());
Expand All @@ -59,7 +59,7 @@ public void testNotEligibleForCancellation() {
task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 99);

// Not enough observations.
reason = tracker.cancellationReason(task);
reason = tracker.checkAndMaybeGetCancellationReason(task);
assertFalse(reason.isPresent());

// Record enough observations to make the moving average 'ready'.
Expand All @@ -68,13 +68,13 @@ public void testNotEligibleForCancellation() {
}

// Task with heap usage < heapBytesThreshold should not be cancelled.
reason = tracker.cancellationReason(task);
reason = tracker.checkAndMaybeGetCancellationReason(task);
assertFalse(reason.isPresent());

// Task with heap usage between heapBytesThreshold and (movingAverage * heapVariance) should not be cancelled.
double allowedHeapUsage = 99.0 * 2.0;
task = createMockTaskWithResourceStats(SearchShardTask.class, 1, randomLongBetween(99, (long) allowedHeapUsage - 1));
reason = tracker.cancellationReason(task);
reason = tracker.checkAndMaybeGetCancellationReason(task);
assertFalse(reason.isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public String name() {
public void update(Task task) {}

@Override
public Optional<TaskCancellation.Reason> cancellationReason(Task task) {
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
return Optional.empty();
}
};
Expand Down

0 comments on commit de64339

Please sign in to comment.