From 6beb9a8fe2bd4594cb290dab30262e73f1fa0e39 Mon Sep 17 00:00:00 2001 From: hugui <254963746@qq.com> Date: Sun, 21 Feb 2016 23:33:19 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=9C=89=E4=B8=AD=E6=96=AD?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E7=9A=84=20=20=20InterruptibleJobRunner?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../tasktracker/runner/JobRunnerDelegate.java | 45 ++++++++----------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/JobRunnerDelegate.java b/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/JobRunnerDelegate.java index 61c1cbc53..ba6f13723 100644 --- a/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/JobRunnerDelegate.java +++ b/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/JobRunnerDelegate.java @@ -35,7 +35,7 @@ public class JobRunnerDelegate implements Runnable { private TaskTrackerMonitor monitor; private Interruptible interruptor; private JobRunner curJobRunner; - private boolean isInterruptibleJobRunner = false; + private boolean interrupted = false; public JobRunnerDelegate(TaskTrackerAppContext appContext, JobWrapper jobWrapper, RunnerCallback callback) { @@ -43,30 +43,25 @@ public JobRunnerDelegate(TaskTrackerAppContext appContext, this.callback = callback; this.jobWrapper = jobWrapper; - this.isInterruptibleJobRunner = isInterruptibleJobRunner(this.appContext); this.logger = (BizLoggerAdapter) BizLoggerFactory.getLogger( appContext.getBizLogLevel(), appContext.getRemotingClient(), appContext); monitor = (TaskTrackerMonitor) appContext.getMonitor(); - if (isInterruptibleJobRunner()) { - this.interruptor = new Interruptible() { - @Override - public void interrupt() { - JobRunnerDelegate.this.interrupt(); - } - }; - } + this.interruptor = new Interruptible() { + @Override + public void interrupt() { + JobRunnerDelegate.this.interrupt(); + } + }; } @Override public void run() { try { - if (isInterruptibleJobRunner()) { - blockedOn(interruptor); - if (Thread.currentThread().isInterrupted()) { - interruptor.interrupt(); - } + blockedOn(interruptor); + if (Thread.currentThread().isInterrupted()) { + interruptor.interrupt(); } LtsLoggerFactory.setLogger(logger); @@ -116,32 +111,30 @@ public void run() { } catch (Throwable t) { LOGGER.warn("monitor error:" + t.getMessage(), t); } - + if (isInterrupted()) { + // 如果当前线程被阻断了,那么也就不接受新任务了 + response.setReceiveNewJob(false); + } this.jobWrapper = callback.runComplete(response); } } finally { LtsLoggerFactory.remove(); - if (isInterruptibleJobRunner()) { - blockedOn(null); - } + blockedOn(null); } } private void interrupt() { + interrupted = true; + if (this.curJobRunner != null && this.curJobRunner instanceof InterruptibleJobRunner) { ((InterruptibleJobRunner) this.curJobRunner).interrupt(); } } - private static boolean isInterruptibleJobRunner(TaskTrackerAppContext appContext) { - Class jobRunnerClass = appContext.getJobRunnerClass(); - return jobRunnerClass != null && InterruptibleJobRunner.class.isAssignableFrom(appContext.getJobRunnerClass()); - } - - private boolean isInterruptibleJobRunner() { - return this.isInterruptibleJobRunner; + private boolean isInterrupted() { + return this.interrupted; } private void monitor(Action action) {