From 2b207d52d4272a28df93f900e33785b2a190c088 Mon Sep 17 00:00:00 2001 From: hugui <254963746@qq.com> Date: Thu, 24 Sep 2015 20:32:19 +0800 Subject: [PATCH] FIX --- .../processor/JobFinishedProcessor.java | 124 ++++++++---------- 1 file changed, 58 insertions(+), 66 deletions(-) diff --git a/lts-jobtracker/src/main/java/com/lts/jobtracker/processor/JobFinishedProcessor.java b/lts-jobtracker/src/main/java/com/lts/jobtracker/processor/JobFinishedProcessor.java index 114b46c1e..22a0c4eca 100644 --- a/lts-jobtracker/src/main/java/com/lts/jobtracker/processor/JobFinishedProcessor.java +++ b/lts-jobtracker/src/main/java/com/lts/jobtracker/processor/JobFinishedProcessor.java @@ -66,8 +66,7 @@ public void handleFailed(List results) { new ArrayList(results.size()); for (TaskTrackerJobResult result : results) { - JobFeedbackPo jobFeedbackPo = - JobDomainConverter.convert(result); + JobFeedbackPo jobFeedbackPo = JobDomainConverter.convert(result); jobFeedbackPos.add(jobFeedbackPo); } // 2. 失败的存储在反馈队列 @@ -308,92 +307,85 @@ private void finishProcess(List results) { for (TaskTrackerJobResult result : results) { JobWrapper jobWrapper = result.getJobWrapper(); - if (jobWrapper.getJob().isSchedule()) { - JobPo cronJobPo = application.getCronJobQueue().finish(jobWrapper.getJobId()); - if (cronJobPo == null) { - // 可能任务队列中改条记录被删除了 - return; - } - Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronJobPo.getCronExpression()); - if (nextTriggerTime == null) { - application.getCronJobQueue().remove(jobWrapper.getJobId()); - return; - } - // 表示下次还要执行 - try { - cronJobPo.setTaskTrackerIdentity(null); - cronJobPo.setIsRunning(false); - cronJobPo.setTriggerTime(nextTriggerTime.getTime()); - cronJobPo.setGmtModified(SystemClock.now()); - application.getExecutableJobQueue().add(cronJobPo); - } catch (DuplicateJobException e) { - LOGGER.warn(e.getMessage(), e); - } + if (jobWrapper.getJob().isSchedule()) { + finishScheduleJob(jobWrapper.getJobId()); } // 从正在执行的队列中移除 application.getExecutingJobQueue().remove(jobWrapper.getJobId()); } } + private void finishScheduleJob(String jobId) { + JobPo cronJobPo = application.getCronJobQueue().finish(jobId); + if (cronJobPo == null) { + // 可能任务队列中改条记录被删除了 + return; + } + Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronJobPo.getCronExpression()); + if (nextTriggerTime == null) { + // 从CronJob队列中移除 + application.getCronJobQueue().remove(jobId); + return; + } + // 表示下次还要执行 + try { + cronJobPo.setTaskTrackerIdentity(null); + cronJobPo.setIsRunning(false); + cronJobPo.setTriggerTime(nextTriggerTime.getTime()); + cronJobPo.setGmtModified(SystemClock.now()); + application.getExecutableJobQueue().add(cronJobPo); + } catch (DuplicateJobException e) { + LOGGER.warn(e.getMessage(), e); + } + } + /** * 将任务加入重试队列 */ - private void retryProcess(List results) { if (CollectionUtils.isEmpty(results)) { return; } for (TaskTrackerJobResult result : results) { + JobWrapper jobWrapper = result.getJobWrapper(); // 1. 加入到重试队列 JobPo jobPo = application.getExecutingJobQueue().get(jobWrapper.getJobId()); - if (jobPo != null) { - - // 重试次数+1 - jobPo.setRetryTimes((jobPo.getRetryTimes() == null ? 0 : jobPo.getRetryTimes()) + 1); - Long nextRetryTriggerTime = DateUtils.addMinute(new Date(), jobPo.getRetryTimes()).getTime(); - - boolean needAdd = true; - - if (jobPo.isSchedule()) { - // 如果是 cron Job, 判断任务下一次执行时间和重试时间的比较 - JobPo cronJobPo = application.getCronJobQueue().finish(jobWrapper.getJobId()); - if (cronJobPo != null) { - Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronJobPo.getCronExpression()); - if (nextTriggerTime != null && nextTriggerTime.getTime() < nextRetryTriggerTime) { - // 表示下次还要执行, 并且下次执行时间比下次重试时间要早, 那么不重试,直接使用下次的执行时间 - try { - cronJobPo.setTaskTrackerIdentity(null); - cronJobPo.setIsRunning(false); - cronJobPo.setTriggerTime(nextTriggerTime.getTime()); - cronJobPo.setGmtModified(SystemClock.now()); - application.getExecutableJobQueue().add(cronJobPo); - } catch (DuplicateJobException e) { - LOGGER.error(e.getMessage(), e); - } - needAdd = false; - } - } - } - if (needAdd) { - // 加入到队列, 重试 - jobPo.setIsRunning(false); - jobPo.setTaskTrackerIdentity(null); - jobPo.setGmtModified(SystemClock.now()); - // 延迟重试时间就等于重试次数(分钟) - jobPo.setTriggerTime(nextRetryTriggerTime); - try { - application.getExecutableJobQueue().add(jobPo); - } catch (DuplicateJobException e) { - LOGGER.error(e.getMessage(), e); + if (jobPo == null) { // 表示已经被删除了 + continue; + } + + // 重试次数+1 + jobPo.setRetryTimes((jobPo.getRetryTimes() == null ? 0 : jobPo.getRetryTimes()) + 1); + Long nextRetryTriggerTime = DateUtils.addMinute(new Date(), jobPo.getRetryTimes()).getTime(); + + if (jobPo.isSchedule()) { + // 如果是 cron Job, 判断任务下一次执行时间和重试时间的比较 + JobPo cronJobPo = application.getCronJobQueue().finish(jobWrapper.getJobId()); + if (cronJobPo != null) { + Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronJobPo.getCronExpression()); + if (nextTriggerTime != null && nextTriggerTime.getTime() < nextRetryTriggerTime) { + // 表示下次还要执行, 并且下次执行时间比下次重试时间要早, 那么不重试,直接使用下次的执行时间 + nextRetryTriggerTime = nextTriggerTime.getTime(); + jobPo = cronJobPo; } } + } - // 从正在执行的队列中移除 - application.getExecutingJobQueue().remove(jobPo.getJobId()); - + // 加入到队列, 重试 + jobPo.setTaskTrackerIdentity(null); + jobPo.setIsRunning(false); + jobPo.setGmtModified(SystemClock.now()); + // 延迟重试时间就等于重试次数(分钟) + jobPo.setTriggerTime(nextRetryTriggerTime); + try { + application.getExecutableJobQueue().add(jobPo); + } catch (DuplicateJobException e) { + LOGGER.error(e.getMessage(), e); } + // 从正在执行的队列中移除 + application.getExecutingJobQueue().remove(jobPo.getJobId()); } } }