Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static void testMysqlQueue() {
// 节点信息配置
jobTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
// jobTracker.setRegistryAddress("redis://127.0.0.1:6379");
jobTracker.setListenPort(35002); // 默认 35001
jobTracker.setListenPort(35001); // 默认 35001
jobTracker.setClusterName("test_cluster");

jobTracker.addMasterChangeListener(new MasterChangeListenerImpl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@
import com.lts.tasktracker.Result;
import com.lts.tasktracker.domain.Response;
import com.lts.tasktracker.domain.TaskTrackerAppContext;
import com.lts.tasktracker.logger.BizLogger;
import com.lts.tasktracker.logger.BizLoggerAdapter;
import com.lts.tasktracker.logger.BizLoggerFactory;
import com.lts.tasktracker.logger.BizLoggerImpl;
import com.lts.tasktracker.monitor.TaskTrackerMonitor;
import sun.nio.ch.Interruptible;

Expand All @@ -37,31 +35,38 @@ public class JobRunnerDelegate implements Runnable {
private TaskTrackerMonitor monitor;
private Interruptible interruptor;
private JobRunner curJobRunner;
private boolean isInterruptibleJobRunner = false;

public JobRunnerDelegate(TaskTrackerAppContext appContext,
JobWrapper jobWrapper, RunnerCallback callback) {
this.jobWrapper = jobWrapper;
this.callback = callback;
this.appContext = appContext;
this.logger = (BizLoggerAdapter)BizLoggerFactory.getLogger(
this.callback = callback;
this.jobWrapper = jobWrapper;

this.isInterruptibleJobRunner = isInterruptibleJobRunner(this.appContext);
this.logger = (BizLoggerAdapter) BizLoggerFactory.getLogger(
appContext.getBizLogLevel(),
appContext.getRemotingClient(), appContext);
monitor = (TaskTrackerMonitor) appContext.getMonitor();
this.interruptor = new Interruptible() {
@Override
public void interrupt() {
JobRunnerDelegate.this.interrupt();
}
};

if (isInterruptibleJobRunner()) {
this.interruptor = new Interruptible() {
@Override
public void interrupt() {
JobRunnerDelegate.this.interrupt();
}
};
}
}

@Override
public void run() {
try {

blockedOn(interruptor);
if (Thread.currentThread().isInterrupted()) {
interruptor.interrupt();
if (isInterruptibleJobRunner()) {
blockedOn(interruptor);
if (Thread.currentThread().isInterrupted()) {
interruptor.interrupt();
}
}

LtsLoggerFactory.setLogger(logger);
Expand All @@ -75,9 +80,8 @@ public void run() {
try {
appContext.getRunnerPool().getRunningJobManager()
.in(jobWrapper.getJobId());

this.curJobRunner = appContext.getRunnerPool().getRunnerFactory().newRunner();
Result result = curJobRunner.run(jobWrapper.getJob());
Result result = this.curJobRunner.run(jobWrapper.getJob());

if (result == null) {
response.setAction(Action.EXECUTE_SUCCESS);
Expand Down Expand Up @@ -113,11 +117,15 @@ public void run() {
LOGGER.warn("monitor error:" + t.getMessage(), t);
}

jobWrapper = callback.runComplete(response);
this.jobWrapper = callback.runComplete(response);

}
} finally {
LtsLoggerFactory.remove();
blockedOn(null);

if (isInterruptibleJobRunner()) {
blockedOn(null);
}
}
}

Expand All @@ -127,6 +135,15 @@ private void interrupt() {
}
}

private static boolean isInterruptibleJobRunner(TaskTrackerAppContext appContext) {
Class<?> jobRunnerClass = appContext.getJobRunnerClass();
return jobRunnerClass != null && InterruptibleJobRunner.class.isAssignableFrom(appContext.getJobRunnerClass());
}

private boolean isInterruptibleJobRunner() {
return this.isInterruptibleJobRunner;
}

private void monitor(Action action) {
if (action == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@
import com.lts.core.json.JSON;
import com.lts.tasktracker.Result;

import java.util.concurrent.atomic.AtomicLong;

/**
* @author Robert HG (254963746@qq.com) on 2/21/16.
*/
public class NormalJobRunner implements JobRunner {
boolean stop = false;

protected boolean stop = false;

public static AtomicLong l = new AtomicLong(0);

@Override
public Result run(Job job) throws Throwable {
System.out.println("我开始执行:" + JSON.toJSONString(job));
while (!stop) {
int i = 1;
l.incrementAndGet();
}
System.out.println("我退出了");
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public void testInterruptor() throws NoAvailableJobRunnerException {
TaskTrackerAppContext appContext = new TaskTrackerAppContext();
appContext.setConfig(config);
appContext.setEventCenter(new InjvmEventCenter());
// appContext.setJobRunnerClass(TestInterruptorJobRunner.class);
appContext.setJobRunnerClass(NormalJobRunner.class);
appContext.setJobRunnerClass(TestInterruptorJobRunner.class);
// appContext.setJobRunnerClass(NormalJobRunner.class);

RunnerPool runnerPool = new RunnerPool(appContext);

Expand All @@ -57,7 +57,7 @@ public JobWrapper runComplete(Response response) {
jobWrapper.setJob(job);

runnerPool.execute(jobWrapper, callback);

System.out.println(runnerPool.getAvailablePoolSize());

try {
Thread.sleep(5000L);
Expand All @@ -66,6 +66,18 @@ public JobWrapper runComplete(Response response) {
}
// 5s之后停止
runnerPool.stopWorking();

while(true){
try {
// 如果这个数字还在增长,表示线程还在执行,测试发现 NormalJobRunner 确实还在执行 TestInterruptorJobRunner 会停止
System.out.println(NormalJobRunner.l);
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(runnerPool.getAvailablePoolSize());
}

}

}
Original file line number Diff line number Diff line change
@@ -1,29 +1,13 @@
package com.lts.tasktracker.runner;

import com.lts.core.domain.Job;
import com.lts.core.json.JSON;
import com.lts.tasktracker.Result;

/**
* @author Robert HG (254963746@qq.com) on 2/21/16.
*/
public class TestInterruptorJobRunner implements InterruptibleJobRunner {

private boolean stop = false;
public class TestInterruptorJobRunner extends NormalJobRunner implements InterruptibleJobRunner {

@Override
public void interrupt() {
System.out.println("我设置停止标识");
stop = true;
}

@Override
public Result run(Job job) throws Throwable {
System.out.println("我开始执行:" + JSON.toJSONString(job));
while (!stop) {
int i = 1;
}
System.out.println("我退出了");
return null;
}
}