diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java index a968bbd96..7c82e997c 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java @@ -33,9 +33,9 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import lombok.extern.slf4j.Slf4j; @@ -94,15 +94,15 @@ public class JobsScheduler { private static final String SUPPORTED_OPERATIONS_STRING = String.join(",", OPERATIONS_REGISTRY.keySet()); - private final ExecutorService executorService; + private final ThreadPoolExecutor executors; private final OperationTaskFactory taskFactory; private final TablesClient tablesClient; public JobsScheduler( - ExecutorService executorService, + ThreadPoolExecutor executors, OperationTaskFactory taskFactory, TablesClient tablesClient) { - this.executorService = executorService; + this.executors = executors; this.taskFactory = taskFactory; this.tablesClient = tablesClient; } @@ -124,11 +124,14 @@ public static void main(String[] args) { OperationTask.POLL_INTERVAL_MS_DEFAULT), NumberUtils.toLong( cmdLine.getOptionValue("taskTimeoutMs"), OperationTask.TIMEOUT_MS_DEFAULT)); - JobsScheduler app = - new JobsScheduler( - Executors.newFixedThreadPool(getNumParallelJobs(cmdLine)), - tasksFactory, - tablesClientFactory.create()); + ThreadPoolExecutor executors = + new ThreadPoolExecutor( + getNumParallelJobs(cmdLine), + getNumParallelJobs(cmdLine), + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); + JobsScheduler app = new JobsScheduler(executors, tasksFactory, tablesClientFactory.create()); app.run( operationType, operationTaskCls.toString(), @@ -162,7 +165,7 @@ protected void run( log.info("Submitting and running {} jobs based on the job type: {}", taskList.size(), jobType); List>> taskFutures = new ArrayList<>(); for (OperationTask operationTask : taskList) { - taskFutures.add(executorService.submit(operationTask)); + taskFutures.add(executors.submit(operationTask)); } int emptyStateJobCount = 0; @@ -173,10 +176,6 @@ protected void run( try { long passedTimeMillis = System.currentTimeMillis() - startTimeMillis; long remainingTimeMillis = TimeUnit.HOURS.toMillis(tasksWaitHours) - passedTimeMillis; - if (remainingTimeMillis <= 0) { - // treat as a global timeout case similar to future.get timeout - throw new TimeoutException(); - } jobState = taskFuture.get(remainingTimeMillis, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { log.error(String.format("Operation for %s failed with exception", task), e); @@ -184,20 +183,26 @@ protected void run( } catch (InterruptedException e) { throw new RuntimeException("Scheduler thread is interrupted, shutting down", e); } catch (TimeoutException e) { - if (!taskFuture.isDone()) { + // Clear queue to stop internal tasks submission + if (!executors.getQueue().isEmpty()) { log.warn( - "Attempting to cancel job for {} because of timeout of {} hours", - task, - tasksWaitHours); - if (taskFuture.cancel(true)) { - log.warn("Cancelled job for {} because of timeout of {} hours", task, tasksWaitHours); - jobStateCountMap.put(JobState.CANCELLED, jobStateCountMap.get(JobState.CANCELLED) + 1); - } + "Drops {} tasks for job type {} from wait queue due to timeout", + executors.getQueue().size(), + jobType); + executors.getQueue().clear(); + } + log.warn( + "Attempting to cancel job for {} because of timeout of {} hours", task, tasksWaitHours); + if (taskFuture.cancel(true)) { + log.warn("Cancelled job for {} because of timeout of {} hours", task, tasksWaitHours); } } finally { if (jobState.isPresent()) { jobStateCountMap.put(jobState.get(), jobStateCountMap.get(jobState.get()) + 1); + } else if (taskFuture.isCancelled()) { + jobStateCountMap.put(JobState.CANCELLED, jobStateCountMap.get(JobState.CANCELLED) + 1); } else { + // Jobs that are skipped due to replica or missing retention policy, etc. emptyStateJobCount++; } } @@ -211,7 +216,7 @@ protected void run( jobStateCountMap.get(JobState.CANCELLED), jobStateCountMap.get(JobState.FAILED), emptyStateJobCount); - executorService.shutdown(); + executors.shutdown(); METER.counterBuilder("scheduler_end_count").build().add(1); reportMetrics(jobStateCountMap, taskType, startTimeMillis); }