Skip to content
Merged

FIX #110

Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public void handleFailed(List<TaskTrackerJobResult> results) {
new ArrayList<JobFeedbackPo>(results.size());

for (TaskTrackerJobResult result : results) {
JobFeedbackPo jobFeedbackPo =
JobDomainConverter.convert(result);
JobFeedbackPo jobFeedbackPo = JobDomainConverter.convert(result);
jobFeedbackPos.add(jobFeedbackPo);
}
// 2. 失败的存储在反馈队列
Expand Down Expand Up @@ -308,92 +307,85 @@ private void finishProcess(List<TaskTrackerJobResult> results) {
for (TaskTrackerJobResult result : results) {

JobWrapper jobWrapper = result.getJobWrapper();
if (jobWrapper.getJob().isSchedule()) {

JobPo cronJobPo = application.getCronJobQueue().finish(jobWrapper.getJobId());
if (cronJobPo == null) {
// 可能任务队列中改条记录被删除了
return;
}
Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronJobPo.getCronExpression());
if (nextTriggerTime == null) {
application.getCronJobQueue().remove(jobWrapper.getJobId());
return;
}
// 表示下次还要执行
try {
cronJobPo.setTaskTrackerIdentity(null);
cronJobPo.setIsRunning(false);
cronJobPo.setTriggerTime(nextTriggerTime.getTime());
cronJobPo.setGmtModified(SystemClock.now());
application.getExecutableJobQueue().add(cronJobPo);
} catch (DuplicateJobException e) {
LOGGER.warn(e.getMessage(), e);
}
if (jobWrapper.getJob().isSchedule()) {
finishScheduleJob(jobWrapper.getJobId());
}
// 从正在执行的队列中移除
application.getExecutingJobQueue().remove(jobWrapper.getJobId());
}
}

private void finishScheduleJob(String jobId) {
JobPo cronJobPo = application.getCronJobQueue().finish(jobId);
if (cronJobPo == null) {
// 可能任务队列中改条记录被删除了
return;
}
Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronJobPo.getCronExpression());
if (nextTriggerTime == null) {
// 从CronJob队列中移除
application.getCronJobQueue().remove(jobId);
return;
}
// 表示下次还要执行
try {
cronJobPo.setTaskTrackerIdentity(null);
cronJobPo.setIsRunning(false);
cronJobPo.setTriggerTime(nextTriggerTime.getTime());
cronJobPo.setGmtModified(SystemClock.now());
application.getExecutableJobQueue().add(cronJobPo);
} catch (DuplicateJobException e) {
LOGGER.warn(e.getMessage(), e);
}
}

/**
* 将任务加入重试队列
*/

private void retryProcess(List<TaskTrackerJobResult> results) {
if (CollectionUtils.isEmpty(results)) {
return;
}
for (TaskTrackerJobResult result : results) {

JobWrapper jobWrapper = result.getJobWrapper();
// 1. 加入到重试队列
JobPo jobPo = application.getExecutingJobQueue().get(jobWrapper.getJobId());
if (jobPo != null) {

// 重试次数+1
jobPo.setRetryTimes((jobPo.getRetryTimes() == null ? 0 : jobPo.getRetryTimes()) + 1);
Long nextRetryTriggerTime = DateUtils.addMinute(new Date(), jobPo.getRetryTimes()).getTime();

boolean needAdd = true;

if (jobPo.isSchedule()) {
// 如果是 cron Job, 判断任务下一次执行时间和重试时间的比较
JobPo cronJobPo = application.getCronJobQueue().finish(jobWrapper.getJobId());
if (cronJobPo != null) {
Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronJobPo.getCronExpression());
if (nextTriggerTime != null && nextTriggerTime.getTime() < nextRetryTriggerTime) {
// 表示下次还要执行, 并且下次执行时间比下次重试时间要早, 那么不重试,直接使用下次的执行时间
try {
cronJobPo.setTaskTrackerIdentity(null);
cronJobPo.setIsRunning(false);
cronJobPo.setTriggerTime(nextTriggerTime.getTime());
cronJobPo.setGmtModified(SystemClock.now());
application.getExecutableJobQueue().add(cronJobPo);
} catch (DuplicateJobException e) {
LOGGER.error(e.getMessage(), e);
}
needAdd = false;
}
}
}
if (needAdd) {
// 加入到队列, 重试
jobPo.setIsRunning(false);
jobPo.setTaskTrackerIdentity(null);
jobPo.setGmtModified(SystemClock.now());
// 延迟重试时间就等于重试次数(分钟)
jobPo.setTriggerTime(nextRetryTriggerTime);
try {
application.getExecutableJobQueue().add(jobPo);
} catch (DuplicateJobException e) {
LOGGER.error(e.getMessage(), e);
if (jobPo == null) { // 表示已经被删除了
continue;
}

// 重试次数+1
jobPo.setRetryTimes((jobPo.getRetryTimes() == null ? 0 : jobPo.getRetryTimes()) + 1);
Long nextRetryTriggerTime = DateUtils.addMinute(new Date(), jobPo.getRetryTimes()).getTime();

if (jobPo.isSchedule()) {
// 如果是 cron Job, 判断任务下一次执行时间和重试时间的比较
JobPo cronJobPo = application.getCronJobQueue().finish(jobWrapper.getJobId());
if (cronJobPo != null) {
Date nextTriggerTime = CronExpressionUtils.getNextTriggerTime(cronJobPo.getCronExpression());
if (nextTriggerTime != null && nextTriggerTime.getTime() < nextRetryTriggerTime) {
// 表示下次还要执行, 并且下次执行时间比下次重试时间要早, 那么不重试,直接使用下次的执行时间
nextRetryTriggerTime = nextTriggerTime.getTime();
jobPo = cronJobPo;
}
}
}

// 从正在执行的队列中移除
application.getExecutingJobQueue().remove(jobPo.getJobId());

// 加入到队列, 重试
jobPo.setTaskTrackerIdentity(null);
jobPo.setIsRunning(false);
jobPo.setGmtModified(SystemClock.now());
// 延迟重试时间就等于重试次数(分钟)
jobPo.setTriggerTime(nextRetryTriggerTime);
try {
application.getExecutableJobQueue().add(jobPo);
} catch (DuplicateJobException e) {
LOGGER.error(e.getMessage(), e);
}
// 从正在执行的队列中移除
application.getExecutingJobQueue().remove(jobPo.getJobId());
}
}
}