Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,38 +35,33 @@ public class JobRunnerDelegate implements Runnable {
private TaskTrackerMonitor monitor;
private Interruptible interruptor;
private JobRunner curJobRunner;
private boolean isInterruptibleJobRunner = false;
private boolean interrupted = false;

public JobRunnerDelegate(TaskTrackerAppContext appContext,
JobWrapper jobWrapper, RunnerCallback callback) {
this.appContext = appContext;
this.callback = callback;
this.jobWrapper = jobWrapper;

this.isInterruptibleJobRunner = isInterruptibleJobRunner(this.appContext);
this.logger = (BizLoggerAdapter) BizLoggerFactory.getLogger(
appContext.getBizLogLevel(),
appContext.getRemotingClient(), appContext);
monitor = (TaskTrackerMonitor) appContext.getMonitor();

if (isInterruptibleJobRunner()) {
this.interruptor = new Interruptible() {
@Override
public void interrupt() {
JobRunnerDelegate.this.interrupt();
}
};
}
this.interruptor = new Interruptible() {
@Override
public void interrupt() {
JobRunnerDelegate.this.interrupt();
}
};
}

@Override
public void run() {
try {
if (isInterruptibleJobRunner()) {
blockedOn(interruptor);
if (Thread.currentThread().isInterrupted()) {
interruptor.interrupt();
}
blockedOn(interruptor);
if (Thread.currentThread().isInterrupted()) {
interruptor.interrupt();
}

LtsLoggerFactory.setLogger(logger);
Expand Down Expand Up @@ -116,32 +111,30 @@ public void run() {
} catch (Throwable t) {
LOGGER.warn("monitor error:" + t.getMessage(), t);
}

if (isInterrupted()) {
// 如果当前线程被阻断了,那么也就不接受新任务了
response.setReceiveNewJob(false);
}
this.jobWrapper = callback.runComplete(response);

}
} finally {
LtsLoggerFactory.remove();

if (isInterruptibleJobRunner()) {
blockedOn(null);
}
blockedOn(null);
}
}

private void interrupt() {
interrupted = true;

if (this.curJobRunner != null && this.curJobRunner instanceof InterruptibleJobRunner) {
((InterruptibleJobRunner) this.curJobRunner).interrupt();
}
}

private static boolean isInterruptibleJobRunner(TaskTrackerAppContext appContext) {
Class<?> jobRunnerClass = appContext.getJobRunnerClass();
return jobRunnerClass != null && InterruptibleJobRunner.class.isAssignableFrom(appContext.getJobRunnerClass());
}

private boolean isInterruptibleJobRunner() {
return this.isInterruptibleJobRunner;
private boolean isInterrupted() {
return this.interrupted;
}

private void monitor(Action action) {
Expand Down