Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Job Runner 的代理类,
Expand All @@ -35,7 +36,7 @@ public class JobRunnerDelegate implements Runnable {
private TaskTrackerMonitor monitor;
private Interruptible interruptor;
private JobRunner curJobRunner;
private boolean interrupted = false;
private AtomicBoolean interrupted = new AtomicBoolean(false);

public JobRunnerDelegate(TaskTrackerAppContext appContext,
JobWrapper jobWrapper, RunnerCallback callback) {
Expand All @@ -48,8 +49,7 @@ public JobRunnerDelegate(TaskTrackerAppContext appContext,
appContext.getRemotingClient(), appContext);
monitor = (TaskTrackerMonitor) appContext.getMonitor();

this.interruptor = new Interruptible() {
@Override
this.interruptor = new InterruptibleAdapter() {
public void interrupt() {
JobRunnerDelegate.this.interrupt();
}
Expand All @@ -61,7 +61,7 @@ public void run() {
try {
blockedOn(interruptor);
if (Thread.currentThread().isInterrupted()) {
interruptor.interrupt();
((InterruptibleAdapter)interruptor).interrupt();
}

LtsLoggerFactory.setLogger(logger);
Expand Down Expand Up @@ -126,15 +126,16 @@ public void run() {
}

private void interrupt() {
interrupted = true;

if(!interrupted.compareAndSet(false, true)){
return;
}
if (this.curJobRunner != null && this.curJobRunner instanceof InterruptibleJobRunner) {
((InterruptibleJobRunner) this.curJobRunner).interrupt();
}
}

private boolean isInterrupted() {
return this.interrupted;
return this.interrupted.get();
}

private void monitor(Action action) {
Expand All @@ -161,4 +162,13 @@ private static void blockedOn(Interruptible interruptible) {
sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), interruptible);
}

public abstract class InterruptibleAdapter implements Interruptible {
// for > jdk7
public void interrupt(Thread thread) {
interrupt();
}

public abstract void interrupt();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

abstract class InterruptSupport {
private volatile boolean interrupted = false;
private Interruptible interruptor = new Interruptible() {
private Interruptible interruptor = new InterruptRead.InterruptibleAdapter() {

public void interrupt() {
interrupted = true;
InterruptSupport.this.interrupt(); // 位置3
Expand All @@ -20,7 +21,7 @@ public final boolean execute() throws InterruptedException {
blockedOn(interruptor); // 位置1
System.out.println("=======1");
if (Thread.currentThread().isInterrupted()) { // 立马被interrupted
interruptor.interrupt();
((InterruptRead.InterruptibleAdapter)interruptor).interrupt();
System.out.println("=======2");
}
// 执行业务代码
Expand Down Expand Up @@ -91,8 +92,16 @@ public void run() {
};
t.start();
// 先让Read执行3秒
Thread.sleep(3000);
Thread.sleep(30000);
// 发出interrupt中断
t.interrupt();
// t.interrupt();
}

public static abstract class InterruptibleAdapter implements Interruptible{
public void interrupt(Thread thread) {
interrupt();
}

public abstract void interrupt();
}
}