diff --git a/lts-example/src/main/java/com/lts/example/api/JobTrackerTest.java b/lts-example/src/main/java/com/lts/example/api/JobTrackerTest.java index deff441e3..84bbe5051 100644 --- a/lts-example/src/main/java/com/lts/example/api/JobTrackerTest.java +++ b/lts-example/src/main/java/com/lts/example/api/JobTrackerTest.java @@ -62,7 +62,7 @@ public static void testMysqlQueue() { // 节点信息配置 jobTracker.setRegistryAddress("zookeeper://127.0.0.1:2181"); // jobTracker.setRegistryAddress("redis://127.0.0.1:6379"); - jobTracker.setListenPort(35002); // 默认 35001 + jobTracker.setListenPort(35001); // 默认 35001 jobTracker.setClusterName("test_cluster"); jobTracker.addMasterChangeListener(new MasterChangeListenerImpl()); diff --git a/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/JobRunnerDelegate.java b/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/JobRunnerDelegate.java index 059f9b3ef..61c1cbc53 100644 --- a/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/JobRunnerDelegate.java +++ b/lts-tasktracker/src/main/java/com/lts/tasktracker/runner/JobRunnerDelegate.java @@ -9,10 +9,8 @@ import com.lts.tasktracker.Result; import com.lts.tasktracker.domain.Response; import com.lts.tasktracker.domain.TaskTrackerAppContext; -import com.lts.tasktracker.logger.BizLogger; import com.lts.tasktracker.logger.BizLoggerAdapter; import com.lts.tasktracker.logger.BizLoggerFactory; -import com.lts.tasktracker.logger.BizLoggerImpl; import com.lts.tasktracker.monitor.TaskTrackerMonitor; import sun.nio.ch.Interruptible; @@ -37,31 +35,38 @@ public class JobRunnerDelegate implements Runnable { private TaskTrackerMonitor monitor; private Interruptible interruptor; private JobRunner curJobRunner; + private boolean isInterruptibleJobRunner = false; public JobRunnerDelegate(TaskTrackerAppContext appContext, JobWrapper jobWrapper, RunnerCallback callback) { - this.jobWrapper = jobWrapper; - this.callback = callback; this.appContext = appContext; - this.logger = (BizLoggerAdapter)BizLoggerFactory.getLogger( + this.callback = callback; + this.jobWrapper = jobWrapper; + + this.isInterruptibleJobRunner = isInterruptibleJobRunner(this.appContext); + this.logger = (BizLoggerAdapter) BizLoggerFactory.getLogger( appContext.getBizLogLevel(), appContext.getRemotingClient(), appContext); monitor = (TaskTrackerMonitor) appContext.getMonitor(); - this.interruptor = new Interruptible() { - @Override - public void interrupt() { - JobRunnerDelegate.this.interrupt(); - } - }; + + if (isInterruptibleJobRunner()) { + this.interruptor = new Interruptible() { + @Override + public void interrupt() { + JobRunnerDelegate.this.interrupt(); + } + }; + } } @Override public void run() { try { - - blockedOn(interruptor); - if (Thread.currentThread().isInterrupted()) { - interruptor.interrupt(); + if (isInterruptibleJobRunner()) { + blockedOn(interruptor); + if (Thread.currentThread().isInterrupted()) { + interruptor.interrupt(); + } } LtsLoggerFactory.setLogger(logger); @@ -75,9 +80,8 @@ public void run() { try { appContext.getRunnerPool().getRunningJobManager() .in(jobWrapper.getJobId()); - this.curJobRunner = appContext.getRunnerPool().getRunnerFactory().newRunner(); - Result result = curJobRunner.run(jobWrapper.getJob()); + Result result = this.curJobRunner.run(jobWrapper.getJob()); if (result == null) { response.setAction(Action.EXECUTE_SUCCESS); @@ -113,11 +117,15 @@ public void run() { LOGGER.warn("monitor error:" + t.getMessage(), t); } - jobWrapper = callback.runComplete(response); + this.jobWrapper = callback.runComplete(response); + } } finally { LtsLoggerFactory.remove(); - blockedOn(null); + + if (isInterruptibleJobRunner()) { + blockedOn(null); + } } } @@ -127,6 +135,15 @@ private void interrupt() { } } + private static boolean isInterruptibleJobRunner(TaskTrackerAppContext appContext) { + Class jobRunnerClass = appContext.getJobRunnerClass(); + return jobRunnerClass != null && InterruptibleJobRunner.class.isAssignableFrom(appContext.getJobRunnerClass()); + } + + private boolean isInterruptibleJobRunner() { + return this.isInterruptibleJobRunner; + } + private void monitor(Action action) { if (action == null) { return; diff --git a/lts-tasktracker/src/test/java/com/lts/tasktracker/runner/NormalJobRunner.java b/lts-tasktracker/src/test/java/com/lts/tasktracker/runner/NormalJobRunner.java index e96aaebe5..d548d090f 100644 --- a/lts-tasktracker/src/test/java/com/lts/tasktracker/runner/NormalJobRunner.java +++ b/lts-tasktracker/src/test/java/com/lts/tasktracker/runner/NormalJobRunner.java @@ -4,16 +4,22 @@ import com.lts.core.json.JSON; import com.lts.tasktracker.Result; +import java.util.concurrent.atomic.AtomicLong; + /** * @author Robert HG (254963746@qq.com) on 2/21/16. */ public class NormalJobRunner implements JobRunner { - boolean stop = false; + + protected boolean stop = false; + + public static AtomicLong l = new AtomicLong(0); + @Override public Result run(Job job) throws Throwable { System.out.println("我开始执行:" + JSON.toJSONString(job)); while (!stop) { - int i = 1; + l.incrementAndGet(); } System.out.println("我退出了"); return null; diff --git a/lts-tasktracker/src/test/java/com/lts/tasktracker/runner/RunnerPoolTest.java b/lts-tasktracker/src/test/java/com/lts/tasktracker/runner/RunnerPoolTest.java index 9df4541c3..962ceccb1 100644 --- a/lts-tasktracker/src/test/java/com/lts/tasktracker/runner/RunnerPoolTest.java +++ b/lts-tasktracker/src/test/java/com/lts/tasktracker/runner/RunnerPoolTest.java @@ -30,8 +30,8 @@ public void testInterruptor() throws NoAvailableJobRunnerException { TaskTrackerAppContext appContext = new TaskTrackerAppContext(); appContext.setConfig(config); appContext.setEventCenter(new InjvmEventCenter()); -// appContext.setJobRunnerClass(TestInterruptorJobRunner.class); - appContext.setJobRunnerClass(NormalJobRunner.class); + appContext.setJobRunnerClass(TestInterruptorJobRunner.class); +// appContext.setJobRunnerClass(NormalJobRunner.class); RunnerPool runnerPool = new RunnerPool(appContext); @@ -57,7 +57,7 @@ public JobWrapper runComplete(Response response) { jobWrapper.setJob(job); runnerPool.execute(jobWrapper, callback); - + System.out.println(runnerPool.getAvailablePoolSize()); try { Thread.sleep(5000L); @@ -66,6 +66,18 @@ public JobWrapper runComplete(Response response) { } // 5s之后停止 runnerPool.stopWorking(); + + while(true){ + try { + // 如果这个数字还在增长,表示线程还在执行,测试发现 NormalJobRunner 确实还在执行 TestInterruptorJobRunner 会停止 + System.out.println(NormalJobRunner.l); + Thread.sleep(1000L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println(runnerPool.getAvailablePoolSize()); + } + } } \ No newline at end of file diff --git a/lts-tasktracker/src/test/java/com/lts/tasktracker/runner/TestInterruptorJobRunner.java b/lts-tasktracker/src/test/java/com/lts/tasktracker/runner/TestInterruptorJobRunner.java index 93d539dc8..307843d08 100644 --- a/lts-tasktracker/src/test/java/com/lts/tasktracker/runner/TestInterruptorJobRunner.java +++ b/lts-tasktracker/src/test/java/com/lts/tasktracker/runner/TestInterruptorJobRunner.java @@ -1,29 +1,13 @@ package com.lts.tasktracker.runner; -import com.lts.core.domain.Job; -import com.lts.core.json.JSON; -import com.lts.tasktracker.Result; - /** * @author Robert HG (254963746@qq.com) on 2/21/16. */ -public class TestInterruptorJobRunner implements InterruptibleJobRunner { - - private boolean stop = false; +public class TestInterruptorJobRunner extends NormalJobRunner implements InterruptibleJobRunner { @Override public void interrupt() { System.out.println("我设置停止标识"); stop = true; } - - @Override - public Result run(Job job) throws Throwable { - System.out.println("我开始执行:" + JSON.toJSONString(job)); - while (!stop) { - int i = 1; - } - System.out.println("我退出了"); - return null; - } }