Skip to content
Permalink
Browse files

[FIXED JENKINS-28840] Deadlock between Queue.maintain and Executor.in…

…terrupt

More fun here:

- All this originates from Executor extending Thread.
- There is funky logic in the lock handling code of the JVM that makes assumptions
  about how it might proceed with the lock when the thread holding the lock has its
  interrupt flag set.
- Really it would be better if Executor did not extend Thread as that way we wouldn't
  have to deal with some of that complexity. But OTOH we are where we are and backwards
  compatibility may make such a change not possible without a lot of breakage.
- Fixing the issue at hand, firstly requires that interrupting a Computer happens with the
  Queue lock held (to speed up tests we have Jenkins.cleanup get the lock for all Computers)
  That prevents the Queue maintain thread from getting caught
- Secondly, when removing an executor from a computer we process the removal while
  holding the Queue lock, but we move the removal itself to a separate thread if we cannot
  get the Queue lock in order to avoid deadlock.
- Also add helper methods to wrap tasks to be performed while holding the lock
  and a helper method for Runnables that exposes the tryLock functionality

(cherry picked from commit 6f343dc)
  • Loading branch information
stephenc authored and olivergondza committed Jun 15, 2015
1 parent c24c323 commit 119fcbbf98c27f0257ac1be02104e0d87acc8728
@@ -975,20 +975,23 @@ public final long getDemandStartMilliseconds() {
* Called by {@link Executor} to kill excessive executors from this computer.
*/
/*package*/ void removeExecutor(final Executor e) {
Queue.withLock(new Runnable() {
final Runnable task = new Runnable() {
@Override
public void run() {
synchronized (Computer.this) {
executors.remove(e);
addNewExecutorIfNecessary();
if (!isAlive()) // TODO except from interrupt/doYank this is called while the executor still isActive(), so how could !this.isAlive()?
{
if (!isAlive()) {
AbstractCIBase ciBase = Jenkins.getInstance();
ciBase.removeComputer(Computer.this);
}
}
}
});
};
if (!Queue.tryWithLock(task)) {
// JENKINS-28840 if we couldn't get the lock push the operation to a separate thread to avoid deadlocks
threadPoolForRemoting.submit(Queue.wrapWithLock(task));
}
}

/**
@@ -1011,9 +1014,14 @@ protected boolean isAlive() {
* Called from {@link Jenkins#cleanUp}.
*/
public void interrupt() {
for (Executor e : executors) {
e.interruptForShutdown();
}
Queue.withLock(new Runnable() {
@Override
public void run() {
for (Executor e : executors) {
e.interruptForShutdown();
}
}
});
}

public String getSearchUrl() {
@@ -99,7 +99,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -113,6 +112,7 @@
import org.acegisecurity.AccessDeniedException;
import org.acegisecurity.Authentication;
import org.jenkinsci.bytecode.AdaptField;
import org.jenkinsci.remoting.RoleChecker;
import org.kohsuke.stapler.HttpResponse;
import org.kohsuke.stapler.HttpResponses;
import org.kohsuke.stapler.export.Exported;
@@ -337,7 +337,7 @@ public Void call() throws Exception {
}
});

private transient final Lock lock = new ReentrantLock();
private transient final ReentrantLock lock = new ReentrantLock();

private transient final Condition condition = lock.newCondition();

@@ -1214,6 +1214,59 @@ public static void withLock(Runnable runnable) {
}
}

/**
* Some operations require to be performed with the {@link Queue} lock held. Use one of these methods rather
* than locking directly on Queue in order to allow for future refactoring.
* @param runnable the operation to perform.
* @return {@code true} if the lock available and the operation performed.
* @since 1.FIXME
*/
public static boolean tryWithLock(Runnable runnable) {
final Jenkins jenkins = Jenkins.getInstance();
final Queue queue = jenkins == null ? null : jenkins.getQueue();
if (queue == null) {
runnable.run();
return true;
} else {
return queue._tryWithLock(runnable);
}
}
/**
* Wraps a {@link Runnable} with the {@link Queue} lock held.
*
* @param runnable the operation to wrap.
* @since 1.FIXME
*/
public static Runnable wrapWithLock(Runnable runnable) {
final Jenkins jenkins = Jenkins.getInstance();
final Queue queue = jenkins == null ? null : jenkins.getQueue();
return queue == null ? runnable : new LockedRunnable(runnable);
}

/**
* Wraps a {@link hudson.remoting.Callable} with the {@link Queue} lock held.
*
* @param callable the operation to wrap.
* @since 1.FIXME
*/
public static <V, T extends Throwable> hudson.remoting.Callable<V, T> wrapWithLock(hudson.remoting.Callable<V, T> callable) {
final Jenkins jenkins = Jenkins.getInstance();
final Queue queue = jenkins == null ? null : jenkins.getQueue();
return queue == null ? callable : new LockedHRCallable<>(callable);
}

/**
* Wraps a {@link java.util.concurrent.Callable} with the {@link Queue} lock held.
*
* @param callable the operation to wrap.
* @since 1.FIXME
*/
public static <V> java.util.concurrent.Callable<V> wrapWithLock(java.util.concurrent.Callable<V> callable) {
final Jenkins jenkins = Jenkins.getInstance();
final Queue queue = jenkins == null ? null : jenkins.getQueue();
return queue == null ? callable : new LockedJUCCallable<V>(callable);
}

@Override
protected void _await() throws InterruptedException {
condition.await();
@@ -1239,6 +1292,26 @@ protected void _withLock(Runnable runnable) {
}
}

/**
* Some operations require to be performed with the {@link Queue} lock held. Use one of these methods rather
* than locking directly on Queue in order to allow for future refactoring.
* @param runnable the operation to perform.
* @return {@code true} if the lock available and the operation performed.
* @since 1.FIXME
*/
protected boolean _tryWithLock(Runnable runnable) {
if (lock.tryLock()) {
try {
runnable.run();
} finally {
lock.unlock();
}
return true;
} else {
return false;
}
}

/**
* Some operations require to be performed with the {@link Queue} lock held. Use one of these methods rather
* than locking directly on Queue in order to allow for future refactoring.
@@ -1303,6 +1376,13 @@ public void maintain() {
List<BuildableItem> lostPendings = new ArrayList<BuildableItem>(pendings);
for (Computer c : Jenkins.getInstance().getComputers()) {
for (Executor e : c.getExecutors()) {
if (e.isInterrupted()) {
// JENKINS-28840 we will deadlock if we try to touch this executor while interrupt flag set
// we need to clear lost pendings as we cannot know what work unit was on this executor
// while it is interrupted. (All this dancing is a result of Executor extending Thread)
lostPendings.clear(); // we'll get them next time around when the flag is cleared.
continue;
}
if (e.isParking()) {
parked.put(e, new JobOffer(e));
}
@@ -2564,6 +2644,51 @@ public Snapshot(Set<WaitingItem> waitingList, List<BlockedItem> blockedProjects,
this.pendings = new ArrayList<BuildableItem>(pendings);
}
}

private static class LockedRunnable implements Runnable {
private final Runnable delegate;

private LockedRunnable(Runnable delegate) {
this.delegate = delegate;
}

@Override
public void run() {
withLock(delegate);
}
}

private static class LockedJUCCallable<V> implements java.util.concurrent.Callable<V> {
private final java.util.concurrent.Callable<V> delegate;

private LockedJUCCallable(java.util.concurrent.Callable<V> delegate) {
this.delegate = delegate;
}

@Override
public V call() throws Exception {
return withLock(delegate);
}
}

private static class LockedHRCallable<V,T extends Throwable> implements hudson.remoting.Callable<V,T> {
private static final long serialVersionUID = 1L;
private final hudson.remoting.Callable<V,T> delegate;

private LockedHRCallable(hudson.remoting.Callable<V,T> delegate) {
this.delegate = delegate;
}

@Override
public V call() throws T {
return withLock(delegate);
}

@Override
public void checkRoles(RoleChecker checker) throws SecurityException {
delegate.checkRoles(checker);
}
}

@CLIResolver
public static Queue getInstance() {
@@ -2763,13 +2763,19 @@ public void execute(Runnable command) {
LOGGER.log(SEVERE, "Failed to execute termination",e);
}

Set<Future<?>> pending = new HashSet<Future<?>>();
final Set<Future<?>> pending = new HashSet<Future<?>>();
terminating = true;
for( Computer c : computers.values() ) {
c.interrupt();
killComputer(c);
pending.add(c.disconnect(null));
}
// JENKINS-28840 we know we will be interrupting all the Computers so get the Queue lock once for all
Queue.withLock(new Runnable() {
@Override
public void run() {
for( Computer c : computers.values() ) {
c.interrupt();
killComputer(c);
pending.add(c.disconnect(null));
}
}
});
if(udpBroadcastThread!=null)
udpBroadcastThread.shutdown();
if(dnsMultiCast!=null)

0 comments on commit 119fcbb

Please sign in to comment.
You can’t perform that action at this time.