Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -94,15 +94,15 @@ public class JobsScheduler {
private static final String SUPPORTED_OPERATIONS_STRING =
String.join(",", OPERATIONS_REGISTRY.keySet());

private final ExecutorService executorService;
private final ThreadPoolExecutor executors;
private final OperationTaskFactory<? extends OperationTask> taskFactory;
private final TablesClient tablesClient;

public JobsScheduler(
ExecutorService executorService,
ThreadPoolExecutor executors,
OperationTaskFactory<? extends OperationTask> taskFactory,
TablesClient tablesClient) {
this.executorService = executorService;
this.executors = executors;
this.taskFactory = taskFactory;
this.tablesClient = tablesClient;
}
Expand All @@ -124,11 +124,14 @@ public static void main(String[] args) {
OperationTask.POLL_INTERVAL_MS_DEFAULT),
NumberUtils.toLong(
cmdLine.getOptionValue("taskTimeoutMs"), OperationTask.TIMEOUT_MS_DEFAULT));
JobsScheduler app =
new JobsScheduler(
Executors.newFixedThreadPool(getNumParallelJobs(cmdLine)),
tasksFactory,
tablesClientFactory.create());
ThreadPoolExecutor executors =
new ThreadPoolExecutor(
getNumParallelJobs(cmdLine),
getNumParallelJobs(cmdLine),
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
JobsScheduler app = new JobsScheduler(executors, tasksFactory, tablesClientFactory.create());
app.run(
operationType,
operationTaskCls.toString(),
Expand Down Expand Up @@ -162,7 +165,7 @@ protected void run(
log.info("Submitting and running {} jobs based on the job type: {}", taskList.size(), jobType);
List<Future<Optional<JobState>>> taskFutures = new ArrayList<>();
for (OperationTask<?> operationTask : taskList) {
taskFutures.add(executorService.submit(operationTask));
taskFutures.add(executors.submit(operationTask));
}

int emptyStateJobCount = 0;
Expand All @@ -173,31 +176,33 @@ protected void run(
try {
long passedTimeMillis = System.currentTimeMillis() - startTimeMillis;
long remainingTimeMillis = TimeUnit.HOURS.toMillis(tasksWaitHours) - passedTimeMillis;
if (remainingTimeMillis <= 0) {
// treat as a global timeout case similar to future.get timeout
throw new TimeoutException();
}
jobState = taskFuture.get(remainingTimeMillis, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
log.error(String.format("Operation for %s failed with exception", task), e);
jobStateCountMap.put(JobState.FAILED, jobStateCountMap.get(JobState.FAILED) + 1);
} catch (InterruptedException e) {
throw new RuntimeException("Scheduler thread is interrupted, shutting down", e);
} catch (TimeoutException e) {
if (!taskFuture.isDone()) {
// Clear queue to stop internal tasks submission
if (!executors.getQueue().isEmpty()) {
log.warn(
"Attempting to cancel job for {} because of timeout of {} hours",
task,
tasksWaitHours);
if (taskFuture.cancel(true)) {
log.warn("Cancelled job for {} because of timeout of {} hours", task, tasksWaitHours);
jobStateCountMap.put(JobState.CANCELLED, jobStateCountMap.get(JobState.CANCELLED) + 1);
}
"Drops {} tasks for job type {} from wait queue due to timeout",
executors.getQueue().size(),
jobType);
executors.getQueue().clear();
}
log.warn(
"Attempting to cancel job for {} because of timeout of {} hours", task, tasksWaitHours);
if (taskFuture.cancel(true)) {
log.warn("Cancelled job for {} because of timeout of {} hours", task, tasksWaitHours);
}
} finally {
if (jobState.isPresent()) {
jobStateCountMap.put(jobState.get(), jobStateCountMap.get(jobState.get()) + 1);
} else if (taskFuture.isCancelled()) {
jobStateCountMap.put(JobState.CANCELLED, jobStateCountMap.get(JobState.CANCELLED) + 1);
} else {
// Jobs that are skipped due to replica or missing retention policy, etc.
emptyStateJobCount++;
}
}
Expand All @@ -211,7 +216,7 @@ protected void run(
jobStateCountMap.get(JobState.CANCELLED),
jobStateCountMap.get(JobState.FAILED),
emptyStateJobCount);
executorService.shutdown();
executors.shutdown();
METER.counterBuilder("scheduler_end_count").build().add(1);
reportMetrics(jobStateCountMap, taskType, startTimeMillis);
}
Expand Down
Loading